Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析-Spark商业环境实战

栏目: 编程工具 · 发布时间: 7年前

内容简介:本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:官方英文介绍如下:

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

1 从ShuffeManager讲起

一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:

  • 目前只有一个实现 SortShuffleManager。
  • SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
  • SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
  • SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。

官方英文介绍如下:

* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.
复制代码
Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析-Spark商业环境实战

1 华山论剑之BypassMergeSortShuffleWriter

从命名来看,绝对是投机取巧,绕开合并和排序的ShuffleWriter,姑且称之为投机侠吧。

* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
 * writes incoming records to separate files, one file per reduce partition, then concatenates these
 * per-partition files to form a single output file, regions of which are served to reducers.
 * Records are not buffered in memory. It writes output in a format
复制代码

2 华山论剑之成员力量

BypassMergeSortShuffleWriter可是直接开挂的节奏,完全没有什么排序器啊,我来承担一切。我最屌,我承担一切,心声,嘿嘿。

2.1 BypassMergeSortShuffleWriter的孩子:

  • partitionWriters : 看看初始化为数组 ==> private DiskBlockObjectWriter[] partitionWriters,每一个DiskBlockObjectWriter负责处理一个分区的数据。

  • private final int fileBufferSize ==>文件缓冲大小,通过Spark.shuffle.file.buffer属性配置,默认是32KB。

  • private final boolean transferToEnabled => 是否采用NIO的从文件流待文件流的复制方式,spark.file.transferTo属性配置,默认是true。

  • private final int numPartitions => 分区数

  • private final BlockManager blockManager

  • private final Partitioner partitioner => 分区计算器

  • private final ShuffleWriteMetrics writeMetrics

  • private final int shuffleId;

  • private final int mapId ==>map任务的身份标识。

  • private final Serializer serializer;

  • private final IndexShuffleBlockResolver shuffleBlockResolver

  • private FileSegment[] partitionWriterSegments ==>FileSegment数组,每一个DiskBlockObjectWriter对应一个分区,也因此对应一个处理的文件片。

  • @Nullable private MapStatus mapStatus;

  • private long[] partitionLengths;

2 BypassMergeSortShuffleWriter核心实现方法Writer

先欣赏代码段:

public void write(Iterator<Product2<K, V>> records) throws IOException {
        assert (partitionWriters == null);
        if (!records.hasNext()) {
          partitionLengths = new long[numPartitions];
          shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
          mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
          return;
        }
        final SerializerInstance serInstance = serializer.newInstance();
        final long openStartTime = System.nanoTime();
        
        partitionWriters = new DiskBlockObjectWriter[numPartitions];    <=点睛之笔
        partitionWriterSegments = new FileSegment[numPartitions];       <=点睛之笔
        
        for (int i = 0; i < numPartitions; i++) {              <=点睛之笔(按照分区来写片段)
          final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
            blockManager.diskBlockManager().createTempShuffleBlock();
          final File file = tempShuffleBlockIdPlusFile._2();
          final BlockId blockId = tempShuffleBlockIdPlusFile._1();
          
          partitionWriters[i] =                     <=点睛之笔(得到不同分区的DiskBlockObjectWriter)
            blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
        }                                           
        
        
        // Creating the file to write to and creating a disk writer both involve interacting with
        // the disk, and can take a long time in aggregate when we open many files, so should be
        // included in the shuffle write time.
        writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
    
        while (records.hasNext()) {
          final Product2<K, V> record = records.next();
          final K key = record._1();
          partitionWriters[partitioner.getPartition(key)].write(key, record._2());
        }
    
        for (int i = 0; i < numPartitions; i++) {
          final DiskBlockObjectWriter writer = partitionWriters[i];
          partitionWriterSegments[i] = writer.commitAndGet();  <= 生成一堆临时文件,写入到磁盘
          writer.close();
        }
    
        File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);  <==获取一堆临时文件
        File tmp = Utils.tempFileWith(output);
        try {
        
        
          partitionLengths = writePartitionedFile(tmp);   <==多个分区文件合并
          shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
                                                          <==生成索引
                                                          
                                                          
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
          }
        }
        mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
      }
复制代码

2 BypassMergeSortShuffleWriter核心实现方法writePartitionedFile

聚合每一个分区文件为正式的Block文件

Concatenate all of the per-partition files into a single combined file.

    private long[] writePartitionedFile(File outputFile) throws IOException {
        // Track location of the partition starts in the output file
        final long[] lengths = new long[numPartitions];
        if (partitionWriters == null) {
          // We were passed an empty iterator
          return lengths;
        }
    
        final FileOutputStream out = new FileOutputStream(outputFile, true);
        final long writeStartTime = System.nanoTime();
        boolean threwException = true;
        try {
          for (int i = 0; i < numPartitions; i++) {
            final File file = partitionWriterSegments[i].file();
            if (file.exists()) {
              final FileInputStream in = new FileInputStream(file);
              boolean copyThrewException = true;
              try {
                lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
                copyThrewException = false;
              } finally {
                Closeables.close(in, copyThrewException);
              }
              if (!file.delete()) {
                logger.error("Unable to delete file for partition {}", i);
              }
            }
          }
          threwException = false;
        } finally {
          Closeables.close(out, threwException);
          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
        }
        partitionWriters = null;
        return lengths;
      }
复制代码

3 BypassMergeSortShuffleWriter核心shuffle write流程

  • 根据分区ID,为每一个分区创建DiskBlockObjectWriter
  • 按照分区ID升序写入正式的Shuffle数据文件
  • 最终通过writeIndexFileAndCommit建立MapTask输出的数据索引

不废话,这张图简直画的太好了,望原图作者看到留言于我。

Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析-Spark商业环境实战

以上所述就是小编给大家介绍的《Spark ShuffleManager内存缓冲器BypassMergeSortShuffleWriter设计思路剖析-Spark商业环境实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Artificial Intelligence

Artificial Intelligence

Stuart Russell、Peter Norvig / Pearson / 2009-12-11 / USD 195.00

The long-anticipated revision of this #1 selling book offers the most comprehensive, state of the art introduction to the theory and practice of artificial intelligence for modern applications. Intell......一起来看看 《Artificial Intelligence》 这本书的介绍吧!

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具