聊聊flink TaskManager的heap大小设置

栏目: 编程工具 · 发布时间: 5年前

内容简介:本文主要研究一下flink TaskManager的heap大小设置flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yamlflink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.sh

本文主要研究一下flink TaskManager的heap大小设置

flink-conf.yaml

flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml

# The heap size for the TaskManager JVM

taskmanager.heap.size: 1024m


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 1

# Specify whether TaskManager's managed memory should be allocated when starting
# up (true) or when memory is requested.
#
# We recommend to set this value to 'true' only in setups for pure batch
# processing (DataSet API). Streaming setups currently do not use the TaskManager's
# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
# while the 'memory' and 'filesystem' backends explicitly keep data as objects
# to save on serialization cost.
#
# taskmanager.memory.preallocate: false

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, teh default max is 1GB.
# 
# taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 64mb
# taskmanager.network.memory.max: 1gb
taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size
taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min

config.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.sh

#!/usr/bin/env bash

# WARNING !!! , these values are only used if there is nothing else is specified in
# conf/flink-conf.yaml

DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary

KEY_TASKM_MEM_SIZE="taskmanager.heap.size"
KEY_TASKM_MEM_MB="taskmanager.heap.mb"
KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"

KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback

KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"

# Define FLINK_TM_HEAP if it is not already set
if [ -z "${FLINK_TM_HEAP}" ]; then
    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_TM_HEAP}" == 0 ]; then
    FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
    FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")

    if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
    else
        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
    fi
fi

# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
fi

# Define FLINK_TM_OFFHEAP if it is not already set
if [ -z "${FLINK_TM_OFFHEAP}" ]; then
    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set
if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
    FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
fi


# Define FLINK_TM_NET_BUF_FRACTION if it is not already set
if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
    FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
fi

# Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
    if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then
        FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
        FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
    fi
fi

# Define FLINK_TM_NET_BUF_MIN if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
    # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
    FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
    FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
fi

# Define FLINK_TM_NET_BUF_MAX if it is not already set
if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
    # default: 1GB = 1073741824 bytes
    FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
    FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
fi
  • config.sh在相关变量没有设置的前提下,初始化了FLINK_TM_HEAP、FLINK_TM_MEM_MANAGED_SIZE、FLINK_TM_MEM_MANAGED_FRACTION、FLINK_TM_OFFHEAP、FLINK_TM_MEM_PRE_ALLOCATE、FLINK_TM_NET_BUF_FRACTION等变量

taskmanager.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/taskmanager.sh

#!/usr/bin/env bash
# Start/stop a Flink TaskManager.
USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

STARTSTOP=$1

ARGS=("${@:2}")

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=taskexecutor

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then

    # if memory allocation mode is lazy and no other JVM options are set,
    # set the 'Concurrent Mark Sweep GC'
    if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
        export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
    fi

    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
        echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
    else
        flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
        FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
    fi

    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi

    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then

        TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
        TM_MAX_OFFHEAP_SIZE="8388607T"

        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"

    fi

    # Add TaskManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

    # Startup parameters
    ARGS+=("--configDir" "${FLINK_CONF_DIR}")
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
        # Start a single TaskManager
        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else
        # Example output from `numactl --show` on an AWS c4.8xlarge:
        # policy: default
        # preferred node: current
        # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
        # cpubind: 0 1
        # nodebind: 0 1
        # membind: 0 1
        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
        for NODE_ID in "${NODE_LIST[@]:1}"; do
            # Start a TaskManager for each NUMA node
            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        done
    fi
fi
  • taskmanager.sh首先调用config.sh初始化相关变量,之后计算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后调用flink-console.sh启动相关类
  • 如果FLINK_TM_MEM_PRE_ALLOCATE为false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都没有设置,则追加-XX:+UseG1GC到JVM_ARGS;之后读取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0则通过calculateTaskManagerHeapSizeMB计算TM_HEAP_SIZE,然后以TM_HEAP_SIZE设置xms及Xmx,以TM_MAX_OFFHEAP_SIZE设置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM则会追加到FLINK_ENV_JAVA_OPTS
  • calculateTaskManagerHeapSizeMB在config.sh中有定义,另外其对应的 java 代码在TaskManagerServices.calculateHeapSizeMB

TaskManagerServices

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    /**
     * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
     * based on the total memory to use and the given configuration parameters.
     *
     * @param totalJavaMemorySizeMB
     *         overall available memory to use (heap and off-heap)
     * @param config
     *         configuration object
     *
     * @return heap memory to use (in megabytes)
     */
    public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySizeMB > 0);

        // subtract the Java memory used for network buffers (always off-heap)
        final long networkBufMB =
            calculateNetworkBufferMemory(
                totalJavaMemorySizeMB << 20, // megabytes to bytes
                config) >> 20; // bytes to megabytes
        final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;

        // split the available Java memory between heap and off-heap

        final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);

        final long heapSizeMB;
        if (useOffHeap) {

            long offHeapSize;
            String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
                try {
                    offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException(
                        "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            } else {
                offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
            }

            if (offHeapSize <= 0) {
                // calculate off-heap section via fraction
                double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
                offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
            }

            TaskManagerServicesConfiguration
                .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
                    TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
                    "Managed memory size too large for " + networkBufMB +
                        " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
                        " MB JVM memory");

            heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
        } else {
            heapSizeMB = remainingJavaMemorySizeMB;
        }

        return heapSizeMB;
    }

    /**
     * Calculates the amount of memory used for network buffers based on the total memory to use and
     * the according configuration parameters.
     *
     * <p>The following configuration parameters are involved:
     * <ul>
     *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
     *     <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
     *     <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
     *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
     * </ul>.
     *
     * @param totalJavaMemorySize
     *         overall available memory to use (heap and off-heap, in bytes)
     * @param config
     *         configuration object
     *
     * @return memory to use for network buffers (in bytes); at least one memory segment
     */
    @SuppressWarnings("deprecation")
    public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
        Preconditions.checkArgument(totalJavaMemorySize > 0);

        int segmentSize =
            checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

        final long networkBufBytes;
        if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
            // new configuration based on fractions of available memory with selectable min and max
            float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
            long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
            long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();


            TaskManagerServicesConfiguration
                .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);

            networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
                (long) (networkBufFraction * totalJavaMemorySize)));

            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
                    "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
                    "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                    "Network buffer memory size too large: " + networkBufBytes + " >= " +
                        totalJavaMemorySize + " (total JVM memory size)");
            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes >= segmentSize,
                    "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
                    "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
                        TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                    "Network buffer memory size too small: " + networkBufBytes + " < " +
                        segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        } else {
            // use old (deprecated) network buffers parameter
            int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
            networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;

            TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);

            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
                    networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                    "Network buffer memory size too large: " + networkBufBytes + " >= " +
                        totalJavaMemorySize + " (total JVM memory size)");
            TaskManagerServicesConfiguration
                .checkConfigParameter(networkBufBytes >= segmentSize,
                    networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                    "Network buffer memory size too small: " + networkBufBytes + " < " +
                        segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        }

        return networkBufBytes;
    }

    //......
}
  • FLINK_TM_HEAP设置的是总的taskmanager的heap及off-heap memory,而network buffers总是使用off-heap,因而这里首先要从FLINK_TM_HEAP扣减掉这部分off-heap然后重新计算Xms及Xmx
  • calculateHeapSizeMB先调用calculateNetworkBufferMemory计算networkBufMB,然后从totalJavaMemorySizeMB扣减掉networkBufMB得到remainingJavaMemorySizeMB
  • 之后读取taskmanager.memory.off-heap设置,默认为false,则直接以remainingJavaMemorySizeMB返回;如果为true,则需要计算offHeapSize的值,然后从remainingJavaMemorySizeMB扣减offHeapSize再返回

小结

  • flink-conf.yaml提供了taskmanager.heap.size来设置heap及off-heap memory大小;提供了taskmanager.memory相关配置( taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size )用于设置memory;提供了taskmanager.network.memory相关配置( taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min )用于设置taskmanager的network stack的内存
  • taskmanager.sh首先调用config.sh初始化相关变量,之后计算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后调用flink-console.sh启动相关类;如果FLINK_TM_MEM_PRE_ALLOCATE为false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都没有设置,则追加-XX:+UseG1GC到JVM_ARGS;之后读取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0则通过calculateTaskManagerHeapSizeMB计算TM_HEAP_SIZE,然后以TM_HEAP_SIZE设置xms及Xmx,以TM_MAX_OFFHEAP_SIZE设置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM则会追加到FLINK_ENV_JAVA_OPTS;calculateTaskManagerHeapSizeMB在config.sh中有定义,另外其对应的java代码在TaskManagerServices.calculateHeapSizeMB
  • FLINK_TM_HEAP设置的是总的taskmanager的heap及off-heap memory,而network buffers总是使用off-heap,因而这里首先要从FLINK_TM_HEAP扣减掉这部分off-heap然后重新计算Xms及Xmx;calculateHeapSizeMB先调用calculateNetworkBufferMemory计算networkBufMB,然后从totalJavaMemorySizeMB扣减掉networkBufMB得到remainingJavaMemorySizeMB;之后读取taskmanager.memory.off-heap设置,默认为false,则直接以remainingJavaMemorySizeMB返回;如果为true,则需要计算offHeapSize的值,然后从remainingJavaMemorySizeMB扣减offHeapSize再返回

由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为taskmanager.sh在FLINK_TM_HEAP_MB大于0的时候,则使用该值计算TM_HEAP_SIZE设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_TM_HEAP_MB则取决于FLINK_TM_HEAP或者taskmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.taskmanager;因而要配置taskmanager的heap及off-heap memory大小的话,可以指定FLINK_TM_HEAP环境变量(比如FLINK_TM_HEAP=512m),或者在flink-conf.yaml中指定taskmanager.heap.size;而最终的Xms及Xmx则是FLINK_TM_HEAP扣减掉off-heap而来,确定使用off-heap为network buffers,其余的看是否开启taskmanager.memory.off-heap,默认为false

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Think Python

Think Python

Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99

Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具