聊聊flink的slot.request.timeout配置

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink的slot.request.timeout配置flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurat

本文主要研究一下flink的slot.request.timeout配置

JobManagerOptions

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * The timeout in milliseconds for requesting a slot from Slot Pool.
     */
    public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
        key("slot.request.timeout")
        .defaultValue(5L * 60L * 1000L)
        .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool.");

    //......
}
  • slot.request.timeout默认为5分钟

SlotManagerConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java

public class SlotManagerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class);

    private final Time taskManagerRequestTimeout;
    private final Time slotRequestTimeout;
    private final Time taskManagerTimeout;

    public SlotManagerConfiguration(
            Time taskManagerRequestTimeout,
            Time slotRequestTimeout,
            Time taskManagerTimeout) {
        this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
        this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
        this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
    }

    public Time getTaskManagerRequestTimeout() {
        return taskManagerRequestTimeout;
    }

    public Time getSlotRequestTimeout() {
        return slotRequestTimeout;
    }

    public Time getTaskManagerTimeout() {
        return taskManagerTimeout;
    }

    public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
        final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
        final Time rpcTimeout;

        try {
            rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
        } catch (NumberFormatException e) {
            throw new ConfigurationException("Could not parse the resource manager's timeout " +
                "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
        }

        final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
        final Time taskManagerTimeout = Time.milliseconds(
                configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));

        return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
    }

    private static Time getSlotRequestTimeout(final Configuration configuration) {
        final long slotRequestTimeoutMs;
        if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
            LOGGER.warn("Config key {} is deprecated; use {} instead.",
                ResourceManagerOptions.SLOT_REQUEST_TIMEOUT,
                JobManagerOptions.SLOT_REQUEST_TIMEOUT);
            slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
        } else {
            slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
        }
        return Time.milliseconds(slotRequestTimeoutMs);
    }
}
  • SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT

SlotManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java

public class SlotManager implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);

    /** Scheduled executor for timeouts. */
    private final ScheduledExecutor scheduledExecutor;

    /** Timeout for slot requests to the task manager. */
    private final Time taskManagerRequestTimeout;

    /** Timeout after which an allocation is discarded. */
    private final Time slotRequestTimeout;

    /** Timeout after which an unused TaskManager is released. */
    private final Time taskManagerTimeout;

    /** Map for all registered slots. */
    private final HashMap<SlotID, TaskManagerSlot> slots;

    /** Index of all currently free slots. */
    private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;

    /** All currently registered task managers. */
    private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;

    /** Map of fulfilled and active allocations for request deduplication purposes. */
    private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;

    /** Map of pending/unfulfilled slot allocation requests. */
    private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;

    private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;

    /** ResourceManager's id. */
    private ResourceManagerId resourceManagerId;

    /** Executor for future callbacks which have to be "synchronized". */
    private Executor mainThreadExecutor;

    /** Callbacks for resource (de-)allocations. */
    private ResourceActions resourceActions;

    private ScheduledFuture<?> taskManagerTimeoutCheck;

    private ScheduledFuture<?> slotRequestTimeoutCheck;

    /** True iff the component has been started. */
    private boolean started;

    public SlotManager(
            ScheduledExecutor scheduledExecutor,
            Time taskManagerRequestTimeout,
            Time slotRequestTimeout,
            Time taskManagerTimeout) {
        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
        this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
        this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
        this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);

        slots = new HashMap<>(16);
        freeSlots = new LinkedHashMap<>(16);
        taskManagerRegistrations = new HashMap<>(4);
        fulfilledSlotRequests = new HashMap<>(16);
        pendingSlotRequests = new HashMap<>(16);
        pendingSlots = new HashMap<>(16);

        resourceManagerId = null;
        resourceActions = null;
        mainThreadExecutor = null;
        taskManagerTimeoutCheck = null;
        slotRequestTimeoutCheck = null;

        started = false;
    }

    public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
        LOG.info("Starting the SlotManager.");

        this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
        resourceActions = Preconditions.checkNotNull(newResourceActions);

        started = true;

        taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
            () -> mainThreadExecutor.execute(
                () -> checkTaskManagerTimeouts()),
            0L,
            taskManagerTimeout.toMilliseconds(),
            TimeUnit.MILLISECONDS);

        slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
            () -> mainThreadExecutor.execute(
                () -> checkSlotRequestTimeouts()),
            0L,
            slotRequestTimeout.toMilliseconds(),
            TimeUnit.MILLISECONDS);
    }

    /**
     * Suspends the component. This clears the internal state of the slot manager.
     */
    public void suspend() {
        LOG.info("Suspending the SlotManager.");

        // stop the timeout checks for the TaskManagers and the SlotRequests
        if (taskManagerTimeoutCheck != null) {
            taskManagerTimeoutCheck.cancel(false);
            taskManagerTimeoutCheck = null;
        }

        if (slotRequestTimeoutCheck != null) {
            slotRequestTimeoutCheck.cancel(false);
            slotRequestTimeoutCheck = null;
        }

        for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
            cancelPendingSlotRequest(pendingSlotRequest);
        }

        pendingSlotRequests.clear();

        ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());

        for (InstanceID registeredTaskManager : registeredTaskManagers) {
            unregisterTaskManager(registeredTaskManager);
        }

        resourceManagerId = null;
        resourceActions = null;
        started = false;
    }

    public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
        checkInit();

        if (checkDuplicateRequest(slotRequest.getAllocationId())) {
            LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());

            return false;
        } else {
            PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

            pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

            try {
                internalRequestSlot(pendingSlotRequest);
            } catch (ResourceManagerException e) {
                // requesting the slot failed --> remove pending slot request
                pendingSlotRequests.remove(slotRequest.getAllocationId());

                throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
            }

            return true;
        }
    }

    private void checkSlotRequestTimeouts() {
        if (!pendingSlotRequests.isEmpty()) {
            long currentTime = System.currentTimeMillis();

            Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();

            while (slotRequestIterator.hasNext()) {
                PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();

                if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
                    slotRequestIterator.remove();

                    if (slotRequest.isAssigned()) {
                        cancelPendingSlotRequest(slotRequest);
                    }

                    resourceActions.notifyAllocationFailure(
                        slotRequest.getJobId(),
                        slotRequest.getAllocationId(),
                        new TimeoutException("The allocation could not be fulfilled in time."));
                }
            }
        }
    }

    //......

}
  • SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
  • registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException
  • checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure

小结

  • SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT;slot.request.timeout默认为5分钟
  • SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
  • registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException;checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure

doc


以上所述就是小编给大家介绍的《聊聊flink的slot.request.timeout配置》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Nine Algorithms That Changed the Future

Nine Algorithms That Changed the Future

John MacCormick / Princeton University Press / 2011-12-27 / GBP 19.95

Every day, we use our computers to perform remarkable feats. A simple web search picks out a handful of relevant needles from the world's biggest haystack: the billions of pages on the World Wide Web.......一起来看看 《Nine Algorithms That Changed the Future》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器