如何将 Spark 的 shuffle 移植到自己业务

栏目: IT技术 · 发布时间: 4年前

1.ExternalSorter简介

ExternalSorter是用来 排序 及聚合key-value类型的数据。首先使用分区器将数据按照key进行分区,然后使用自定义的排序器在一个分区内对数据key进行排序。可以生成适合shuffle读取的分区文件。

如果禁用combiner,那么value的输入和输出类型要一致。

注意:ExternalSorter是一个比较通用的排序器,在sort-based shuffle中,可以用一些配置控制其一些特性,比如块儿压缩可以通过配置 spark.shuffle.compress来开启及关闭.假如在non-shuffle场景下使用了ExternalSorter,可能会需要重新读取该配置。

构造函数如下:

private[spark] class ExternalSorter[K, V, C](

context: TaskContext,

aggregator: Option[Aggregator[K, V, C]] = None,

partitioner: Option[Partitioner] = None,

ordering: Option[Ordering[K]] = None,

serializer: Serializer = SparkEnv.get.serializer)

extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())

参数介绍:

 * @param aggregator optional Aggregator with combine functions to use for merging data
 * @param partitioner optional Partitioner; if given, sort by partition ID and then key
 * @param ordering optional Ordering to sort keys within each partition; should be a total ordering
 * @param serializer serializer to use when spilling to disk
  • aggregator用来完成聚合操作。

  • partitioner就是shuffle的算子的分区器。也是一个maptask,写数据输出给哪个reducer,由该分区器决定。

  • ordering排序器,可选,对key进行排序。

  • serializer用来在写入数据到磁盘的时候对数据进行序列化,读数据的时候要用他进行反序列化。

注意,假如设置了ordering参数,那么就必然会对数据进行按key排序,所以一定是要在需要排序的时候才设置。比如,在一个不需要map端合并的map操作中,为了避免不必要的排序,需要将ordering参数设置为None。另一方面,假如需要map端合并,那相对于none指定排序器会更加高效。

使用该类的步骤

  1. 实例化一个ExternalSorter。

  2. 调用insertAll(),并传入records数据集。

  3. 触发排序及合并。可以使用iterator()去对元素进行迭代排序或聚合。也可以调用writePartitionedFile()函数,创建已经排序或者聚合的文件,该文件适用于spark sort shuffle。

2.ExternalSorter的工作原理

首先,数据会不停的写入内存缓存区中,假如需要按照key对value进行聚合,则使用的是PartitionedAppendOnlyMap;假如不需要按照key对value进行聚合则使用PartitionedPairBuffer。在内存buffer内部,我们需要按照partition ID对元素进行排序,假如设置了key排序也会按照key对元素进行排序。为了避免频繁调用分区器,会在存储record的时候也存储partition ID 。

其次,假如缓存区达到了内存限制,就会将其溢写到磁盘存为一个文件。这些文件,首先会按照partition ID进行排序,假如需要聚合的话也会按照key或者key的hashcode进行排序。对于每个文件,我们会记录在内存里的时候每个分区有多少元素,所以没必要为每个元素写入partition ID。

然后,当用户调用iterator或者file输出函数的时候,已经溢写的文件就会连同内存的数据一起合并,会使用与前面相同的排序器。如果需要按照key对元素聚合,要么使用设置的排序器进行全局排序,要么读取有相同hashcode的key,然后对相同key的value进行聚合操作。

最后,当调用stop()函数的时候会删除所有的中间结果文件。

 3.案例

其实我们可以直接使用ExternalSorter,实际上就是一个map操作,使用指定的分区器,对数据按照key进行分区,然后会在同一个分区内使用聚合和排序算子,对key进行排序及聚合操作。

3.1 实例化ExternalSorter

val size = 400


val sparkConf = new SparkConf()

sparkConf.setMaster("local")

sparkConf.setAppName(this.getClass.getCanonicalName)


sparkConf.set("spark.shuffle.manager", "sort")

sparkConf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 40).toString)


val sc = new SparkContext(sparkConf)


val context = SparkUtils.fakeTaskContext(sc)


val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)

val ord = implicitly[Ordering[Int]]


// // Both aggregator and ordering

val sorter = new ExternalSorter[Int, Int, Int](

context, Some(agg), Some(new HashPartitioner(4)),Some(ord))

上面agg和ord是聚合器和排序器,两者均可以自定义,也可以设置为None,浪尖这里给了最简单的案例:key,value及聚合后的结果都是Int类型。

3.2 插入数据

浪尖这里是400条数据,key的范围是0-39,value范围是0-399.

<span>val elements = (0 until size).iterator.map { i =&gt; (i % 40, i) }</span>

<span>sorter.insertAll(elements)</span>

3.3 触发输出计算

可以按照分区将数据输出到console或者缓存到一个scala集合里。

sorter.partitionedIterator

.map(p => (p._1, p._2))

.filter(p=> p._1 == 0)

.flatMap(p=>p._2)

.foreach(println)

浪尖这里是获取了partition ID的为0的数据,并输出,结果如下:

(0,1800)
(4,1840)
(8,1880)
(12,1920)
(16,1960)
(20,2000)
(24,2040)
(28,2080)
(32,2120)
(36,2160)

这个计算过程,中间数据会落地到磁盘里的,触发溢写操作的的配置参数是:

sparkConf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 40).toString)

浪尖这里测试方便,达到10条就会触发刷磁盘,临时文件会在调用sorter.stop()之后删除。

要想看是否有中间文件,操作方法也很简单,spark的blockmanager提供了接口:

val beforeCleanUp = SparkUtils.getBlockManager(sc).diskBlockManager.getAllFiles().size

println(beforeCleanUp)

sorter.stop()

val afterCleanUp = SparkUtils.getBlockManager(sc).diskBlockManager.getAllFiles().size

println(afterCleanUp)

结果如下:

<span>36</span>

<span>0</span>

3.4 输出到文件

shuffle中间结果肯定是输出到blockmanager管理的,也是可以落地到磁盘,浪尖这里也给出让其落地磁盘操作案例。

sorter.partitionedIterator.map(p => (p._1, p._2)).filter(p=> p._1 == 0).flatMap(p=>p._2).foreach(println)


val outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "",new File("data/"))


// outputFile.deleteOnExit()

sorter.writePartitionedFile(ShuffleBlockId(0,0,0),outputFile).foreach(println)

执行之后会在工程的data目录下生成文件,文件是unsaferow及序列化的,不可以直接查看。

如何将 Spark 的 shuffle 移植到自己业务

3.5 读取溢写文件

sorter的writePartitionedFile方法,返回值是一个数组,数组的下标是 partition ID,元素是该分区数据的大小。读数据的时候由于sorter会将所有的分区数据写入同一个数据文件,其实spark shuffle里还有一个索引文件,浪尖这里是测试用的所有没有索引文件。

<span>[271,271,271,271]</span>

浪尖这里分区器是四个分区,数据设计的比较均匀,所以每个分区数据大小很均匀,都是271.

读取最后一个分区数据的方法,浪尖没有参照源码,给出比较简单的读取方法有兴趣的可以去源码里找找:

val serializer = SparkEnv.get.serializer.newInstance()

val input = new FileInputStream(new File("data/test-unsafe-row-serializer-spill7127803973846287207"))

input.skip(271+271+271)

val deserializer = serializer.deserializeStream(input)

try {

val rows = deserializer.asKeyValueIterator

while (rows.hasNext)

{

val (key,value)=rows.next();

println(key+":"+value)

}

} catch {

case ex: Exception => {

ex.printStackTrace() // 打印到标准err

System.err.println("exception===>: ...") // 打印到标准err

}

}

结果,第一个元素是key,第二个是聚合后的value,可以看到分区特点也很均匀key差值是4,由于排序的原因,所以key也是递增的,这是由于浪尖这个给的hashpartitioner分区数为4,且给了排序器的原因。

浪尖这里读取分区文件的时候由于分区segment之间有分隔符,所以会抛异常,而中止,这正好是给我们结束契机。

4. 代码补充

自己的类要包路径是org.apache.spark.文章提到的 工具 类是:

package org.apache.spark


import java.util.Properties


import org.apache.spark.memory.TaskMemoryManager

import org.apache.spark.storage.BlockManager


object SparkUtils {

def fakeTaskContext(sc: SparkContext): TaskContext = {

val env = sc.env

val taskMemoryManager = new TaskMemoryManager(env.memoryManager, 0)

new TaskContextImpl(

stageId = 0,

stageAttemptNumber = 0,

partitionId = 0,

taskAttemptId = 0,

attemptNumber = 0,

taskMemoryManager = taskMemoryManager,

localProperties = new Properties,

metricsSystem = env.metricsSystem)

}


def getBlockManager(sc:SparkContext):BlockManager = {

sc.env.blockManager

}

}

5.总结

这个思路主要来源于知识星球之前有人问过浪尖,数据集比较大,写分布式spark程序集成到自己的任务里有比较麻烦,所以想问问浪尖有没有好思路。

浪尖想自己实现基于磁盘的排序算法,实际上重复造轮子太复杂了,而且性能不知如何,所以想到利用spark shuffle的基于磁盘的排序操作,把它拿出来,然后使用起来。

其实想给大家的提醒是:

学一个框架源码,不要只停留在阅读理解,要分析总结化为己用,这样 理解才会比较深入 ,成长才会比较大。

推荐阅读:

必知|Scala类型层次结构

spark on yarn 内存分配详解

Apache Flink 与 Apache Hive 的集成

星球要突破1000人了~

如何将 Spark 的 shuffle 移植到自己业务


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms on Strings, Trees and Sequences

Algorithms on Strings, Trees and Sequences

Dan Gusfield / Cambridge University Press / 1997-5-28 / USD 99.99

String algorithms are a traditional area of study in computer science. In recent years their importance has grown dramatically with the huge increase of electronically stored text and of molecular seq......一起来看看 《Algorithms on Strings, Trees and Sequences》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码