Spark统一内存管理

栏目: 编程工具 · 发布时间: 4年前

内容简介:Spark1.6 以后,增加统一内存管理机制内存管理模块包括堆内内存(On-heap Memory),堆外内存(Off-heap Memory)两大区域。Executor Memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

Spark1.6 以后,增加统一内存管理机制内存管理模块包括堆内内存(On-heap Memory),堆外内存(Off-heap Memory)两大区域。

1.堆内内存

Spark统一内存管理

Executor Memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

Storage Memory:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据

User Memory:主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息

Reserved Memory:系统预留内存,会用来存储Spark内部对象

private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)

systemMemory:

val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

Runtime.getRuntime.maxMemory 就是JVM运行时的堆内存,在 Java 程序中通过 -Xmx -Xms 配置,spark中通过 spark.executor.memory--executor-memory 配置的。

useableMemory:spark可用内存

val usableMemory = systemMemory - reservedMemory

补充:

val minSystemMemory = (reservedMemory * 1.5).ceil.toLong

execution Memory不得小于reservedMemory 的1.5倍。

2.堆外内存

Spark 1.6 开始引入了Off-heap memory,调用Java的Unsafe类API申请堆外的内存资源,这种方式不进行Java内存管理,可避免频繁GC,但需要自己实现内存申请和释放的逻辑。

Spark统一内存管理

Spark统一内存管理

3.堆内内存动态调整

初始化:程序提交时,execution和storage各占0.5(通过 spark.memory.storageFraction 配置)

onHeapStorageRegionSize =
    (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

这意味着

  • 在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU 规则进行的。若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
  • Storage 内存的空间被对方占用后,目前的实现是无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。

注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

4.Task内存分配

/**
 * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
 * obtained, or 0 if none can be allocated.
 *  * This call may block until there is enough free memory in some situations, to make sure each
 * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
 * active tasks) before it is forced to spill. This can happen if the number of tasks increase
 * but an older task had a lot of memory already.
 *  * @param numBytes number of bytes to acquire
 * @param taskAttemptId the task attempt acquiring memory
 * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
 *                      one parameter (Long) that represents the desired amount of memory by
 *                      which this pool should be expanded.
 * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
 *                           at this given moment. This is not a field because the max pool
 *                           size is variable in certain cases. For instance, in unified
 *                           memory management, the execution pool can be expanded by evicting
 *                           cached blocks, thereby shrinking the storage pool.
 *  * @return the number of bytes granted to the task.
   */
  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }
memoryForTask
val minMemoryPerTask = poolSize / (2 * numActiveTasks)

5.Spark UI 数据解释

内存分配池的堆部分分为 Eden,Survivor 和 Tenured 三部分空间,而这里面一共包含了两个 Survivor 区域,而这两个 Survivor 区域在任何时候我们只能用到其中一个,所以我们可以使用下面的公式进行描述:

ExecutorMemory = Eden + 2 * Survivor + Tenured

Runtime.getRuntime.maxMemory的差异取决于GC配置

spark.executor.memory 设为1g,如图

Spark统一内存管理

384.1MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) × 页面以1000为换算单位(1000/1024 × 1000/1024)

Spark统一内存管理

366.3MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6)

加上1g堆外内存:

spark.memory.offHeap.enabled    true
spark.memory.offHeap.size       1G

Spark统一内存管理

1390.3MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB

Spark统一内存管理

1.5G ≈ 1457.8MB = ((Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB) × 页面以1000为换算单位(1000/1024 × 1000/1024)


以上所述就是小编给大家介绍的《Spark统一内存管理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Effective Modern C++ 简体中文版

Effective Modern C++ 简体中文版

Scott Meyers / 高博 / 中国电力出版社 / 2018-4-23 / 99

想要彻底理解C++11和C++14,不可止步于熟悉它们引入的语言特性(例如,auto型别推导、移动语义、lambda表达式以及并发支持)。挑战在于高效地运用这些特性——从而使你的软件具备正确性、高效率、可维护性和可移植性。这正是这本实用的图书意欲达成的定位。它描述的正是使用C++11和C++14——现代C++来撰写真正卓越的软件之道。 涵盖以下主题: 大括号初始化、noexcept规格......一起来看看 《Effective Modern C++ 简体中文版》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器