终于!你们想了解的Local Aggregation的设计文档已上线!

栏目: 编程工具 · 发布时间: 4年前

内容简介:之前我们曾在多个参会的Talk中提及我们团队在内部的Flink fork上实现了Local aggregation这个feature。很多童鞋就在后台询问,什么时候反馈给社区,当前是什么进展。昨天,我们已经将这个feature推给了Flink社区,并获得了不少正面反馈,想了解详情的同学请点击“阅读原文”或在Flink dev mailing list中搜索“[DISCUSS] Support Local Aggregation in Flink”. 下面贴出我们的设计文档原文,如果觉得这个Feature对各

之前我们曾在多个参会的Talk中提及我们团队在内部的Flink fork上实现了Local aggregation这个feature。很多童鞋就在后台询问,什么时候反馈给社区,当前是什么进展。昨天,我们已经将这个feature推给了Flink社区,并获得了不少正面反馈,想了解详情的同学请点击“阅读原文”或在Flink dev mailing list中搜索“[DISCUSS] Support Local Aggregation in Flink”. 下面贴出我们的设计文档原文,如果觉得这个Feature对各位的业务也有所帮助,欢迎加入这个thread参与讨论,我们当然乐意你给个“+1”,但建议与意见也一样欢迎。

Support Local Aggregation in Flink

1. Introduction

Currently, keyed streams are widely used to perform aggregating operations (e.g., reduce, sum and window) on the elements that having the same key. When executed at runtime, the elements with the same key will be sent to and aggregated by the same task.

The performance of these aggregating operations is very sensitive to the distribution of keys. In the cases where the distribution of keys follows a powerful law, the performance will be significantly downgraded. More unluckily, increasing the degree of parallelism does not help when a task is overloaded by a single key.

Local aggregation is a widely-adopted method to reduce the performance degraded by data skew. We can decompose the aggregating operations into two phases. In the first phase, we aggregate the elements of the same key at the sender side to obtain partial results. Then at the second phase, these partial results are sent to receivers according to their keys and are combined to obtain the final result. Since the number of partial results received by each receiver is limited by the number of senders, the imbalance among receivers can be reduced. Besides, by reducing the amount of transferred data the performance can be further improved.

Note that to achieve the benefits brought by local aggregation, it’s required that the aggregated results can be easily obtained with decomposition and combination. The condition is satisfied by many common aggregating operations, e.g., sum, count and topN. Few other aggregating operations, like cardinality, cannot be easily decomposed and combined, hence will not benefit from the usage of local aggregation.

In this document, we propose a method to support local aggregation in Flink. We first give a brief introduction to our design in Section 2 and then detail the implementation in Section 3.

2.Design

Users can perform local aggregation with local keyed streams. Local keyed streams resemble keyed streams in many respects. They both partition elements according to keys and allow access to states affiliated with keys. But the partitioning in local keyed streams does not require shuffling and is performed locally.

终于!你们想了解的Local Aggregation的设计文档已上线!

Figure 1. Local aggregation.

The difference in partitioning schema makes it non-trivial to save and restore local keyed states, especially when the degree of parallelism changes. In keyed streams, keys are assigned to a set of key groups and we can easily redistribute these key groups to tasks when the degree of parallelism changes. But in local keyed streams, as elements are partitioned locally, each task has a full range of key groups. When the degree of parallelism changes, we have to redistribute the key groups of old tasks and construct new key groups for new tasks.

终于!你们想了解的Local Aggregation的设计文档已上线!

Figure 2. Saving and restoring of local keyed states

Figure 2 illustrates how local keyed states are saved and restored when the degree of parallelism changes. Similar to the operators on keyed streams, after materializing local keyed states in persistent storage, the operators on local keyed streams will response to the checkpoint coordinator with a state handle containing the meta information of its local key groups. Suppose the parallelism is 3 and the max parallelism is 12, then there exist 36 local key groups in the checkpoint.

2.1 Distributing Local Key Groups

When the degree of parallelism is changed, the checkpoint coordinator will assign these local key groups to new tasks in a roughly even manner. It will iterate over all state handles and assign state handles to new tasks according to the number of key groups. A state handle will be split if only a portion of its key groups are assigned.

In the illustrated example, the state handle of Task 2 is split into 2 parts, each of which contains the meta information of 6 local key groups. Task 1’ then restores its local keyed states with the state handle of Task 1 and first part of the state handle of Task 2.

2.2 Merging Local Key Groups

A task may be assigned multiple local key groups with the same id at restoring. These instances should be merged to construct the new key group.

The merging of list states is quite straight-forward. We can simply merge two list states of the same key by appending the elements of one list state to the other one.

Reducing states and aggregating states can also be merged with the help of user-defined functions. But currently these user-defined functions are not saved in checkpoints, making it impossible to perform merging at restoring.

A solution at first thought is to save these user-defined functions in checkpoints. To be backward compatible, this solution will require a different metadata format for the snapshots of local keyed states.

We can also perform lazy merging to avoid the need for the saving of user-defined functions. When restoring from checkpoints, we keep the values from different key group instances in a list and perform merging when they are accessed. That way, local keyed states and keyed states can share the same metadata format.

Value states and map states can also be merged if users can provide user-defined merge functions in the state descriptors. But the need for the merging of value and map states is not urgent because in most cases users can replace value and map states with reducing and aggregating states if a user-defined merge function can be provided.

3.Implementation

We detail the implementation to support local aggregation in this section.

3.1 API

The following methods will be added in DataStream. They are counterparts of keyBy in local keyed streams.

3.2 Compiling

In the keyed streams produced by localKeyBy, the partition transformation deploys LocalKeyGroupStreamPartitioner instead of KeyGroupStreamPartitioner to partition stream elements.

Since all operator on local keyed streams are performed locally, these operators must be chained with their inputs. Like ForwardPartitioner, we will check the parallelism of upstream and downstream nodes of LocalKeyGroupStreamPartitioners when generating stream graphs. Exceptions will be thrown if their parallelism are not equal.

When generating job graphs, the operators will be chained with the inputs if the partitioner is typed LocalKeyGroupStreamPartitioner.

Besides the selector and the serializer for keys, the scope for key partitioning will also be written in the operator’s configuration. The scope can help the operator know it is performed in a keyed stream or a local keyed stream.

3.3 Executing

The access to local keyed states is very similar to the access to keyed state except that

  • The key group range is full for all tasks, and

  • The access to value and map states is disallowed.

If the merging of reducing and aggregating states is performed lazily, we should use lists to store the values of reducing and aggregating states. When accessing reducing and aggregating states, we iterate over all the elements in the list and merge these elements to produce the result. The extra merging operation only happens at the first access after restoring. In other cases, there only exists one element in the list and no merging is needed.

3.4 Checkpointing and Restoring

The materializing of local keyed states is also similar to that of keyed states, hence can share the same code.

When recovering from failures, the checkpoint coordinator will distribute the state handles of local keyed states to tasks according the method described in Section 2.1.

When restoring from local keyed state handles, we iterate all key-value pairs in assigned key groups. In the cases where the merging functions are saved in local keyed state handles, we merge the values of the same key with corresponding merge functions. While in the cases where the merging is performed lazily, we simply append values to the lists of their keys.

终于!你们想了解的Local Aggregation的设计文档已上线!

终于!你们想了解的Local Aggregation的设计文档已上线!

终于!你们想了解的Local Aggregation的设计文档已上线!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Test Driven Development

Test Driven Development

Kent Beck / Addison-Wesley Professional / 2002-11-18 / USD 49.99

Quite simply, test-driven development is meant to eliminate fear in application development. While some fear is healthy (often viewed as a conscience that tells programmers to "be careful!"), the auth......一起来看看 《Test Driven Development》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具