Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

栏目: 服务器 · 发布时间: 5年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。一句话的事儿: checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态为了保证state的容

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

1 何为状态

  • 在批处理过程中,数据是划分为块分片去完成的,然后每一个Task去处理一个分片。当分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于state的需求还是比较小的。

  • 在流计算过程中,对State有非常高的要求,因为在流系统中输入是一个无限制的流,会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来。

  • Flink的失败恢复依赖于“检查点机制+可部分重发的数据源”。

  • 检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

  • 可部分重发的数据源:Flink选择最近完成的检查点K。然后系统重放整个分布式的数据流,然后给予每个operator他们在检查点k快照中的状态。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。

  • Flink中有两种基本类型的State,即Keyed State和Operator State。

  • State可以被记录,在失败的情况下数据还可以恢复

一句话的事儿:state一般指一个具体的task/operator的状态【state数据默认保存在 java 的堆内存中】

2 检查点Checkpoint 与Barrier

一句话的事儿: checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提是: 持久化的source(如kafka),它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等) 用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

  • 单流的barrier:

    1: 屏障作为数据流的一部分随着记录被注入到数据流中。屏障永远不会赶超通常的流记录,它会严格遵循顺序。

    2: 屏障将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。

    3: 每一个屏障携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。

    4: 屏障不会中断流处理,因此非常轻量级。

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试
  • 并行barrier

    1:不止一个输入流的时的operator,需要在快照屏障上对齐(align)输入流,才会发射出去。 2:可以看到1,2,3会一直放在Input buffer,直到另一个输入流的快照到达Operator。

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

3 有状态的Operator工作一览图

Stateful Flink applications are optimized for local state access. Task state 
is always maintained in memory or, if the state size exceeds the available memory,
in access-efficient on-disk data structures. Hence, tasks perform all computations 
by accessing local, often in-memory, state yielding very low processing latencies.
Flink guarantees exactly-once state consistency in case of failures by periodically 
and asynchronously checkpointing the local state to durable storage.
复制代码
Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

4 状态管理

4.1 原始状态与托管状态

Keyed State和Operator State,可以以两种形式存在:

  • 原始状态(raw state)

  • 托管状态(managed state)

  • 托管状态是由Flink框架管理的状态

  • 原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

  • 通常在DataStream上的状态推荐使用托管的状态。

  • 当实现一个用户自定义的operator时,会使用到原始状态

4.2 State-Keyed State 是什么?直接上干货。(兄弟 State-Operator State)

  • 顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。 stream.keyBy(…)

  • state的数据结构;

    (1) ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值

    (2) ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值

    (3) ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值

    (4) MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素

  • 需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。实际上:这些状态有三种存储方式:

    MemoryStateBackend:
      FsStateBackend
      RockDBStateBackend
    复制代码

4.3 State-Keyed State 存储方式?直接上干货

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试
  • MemoryStateBackend

    state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中 基于内存的state backend在生产环境下不建议使用。

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试
  • FsStateBackend

    state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统。

  • RocksDBStateBackend

    RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时RocksDB需要配置一个远端的filesystem。

    uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。

    RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

4.4 State 生成快照

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

4.5 State 快照恢复

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

5 与Key相关的状态管理案例实战

5.1 RichFlatMapFunction 核心代码奉上

package xuwei.tech.streaming;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/**
 * qinkaixin 2018 11 24 
 */
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {


    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }

    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", 
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), 
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}
复制代码

5.2 RichFlatMapFunction 执行操作

public static void main(String[] args) throws Exception{
    //获取Flink的运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
            .keyBy(0)
            .flatMap(new CountWindowAverage())
            .print();
    env.execute("StafulOperator");
    System.out.println("***********");
}
复制代码

5.3 最终结果为什么是这样的?

  • if the count reaches 2, emit the average and clear the state
  • 所以Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L)一组
  • 所以Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)一组
Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

6 与Operator相关的State案例实战

  • 与Key无关的State,与Operator绑定的state,整个operator只对应一个state

  • 保存Operator state的数据结构为ListState

  • 举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射

  • 继承CheckpointedFunction,实现snapshotState和restoreState。

    To use managed operator state, a stateful function can implement either 
      the more general CheckpointedFunction interface, or the 
      ListCheckpointed<T extends Serializable> interface.
    
      Whenever a checkpoint has to be performed, snapshotState() is called. The 
      counterpart,initializeState(), is called every time the user-defined function 
      is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this,
      initializeState() is not only the place where different types of state are
      initialized, but also where state recovery
      logic is included.
    复制代码

6.1 BufferingSink案例

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
复制代码

6.2 Stateful Source案例

public static class CounterSource  extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}
复制代码

7 checkPoint的配置进一步升华

7.1 checkpoint 开关

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用
  • checkpoint开启之后,默认的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有两种,Exactly-once和At-least-once
  • Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

7.1 checkpoint 调优配置(Cancel处理很有意思)

  • 默认checkpoint功能是disabled的,想要使用的时候需要先启用

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
      // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
      env.enableCheckpointing(1000);
      // 高级选项:
      // 设置模式为exactly-once (这是默认值)
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
      // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
      env.getCheckpointConfig().setCheckpointTimeout(60000);
      // 同一时间只允许进行一个检查点
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
      env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      
      cancel处理选项:
      (1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
      表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定
      的Checkpoint
      
      (2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:
      表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会
      保存checkpoint
    复制代码

8 State Backend 状态的后端存储(一剑封喉)

8.1 配置说明

修改State Backend的两种方式

  • 第一种:单任务调整

    修改当前任务代码
      env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
      或者new MemoryStateBackend()
      或者new RocksDBStateBackend( hdfs->url, true);【需要添加第三方依赖】
    复制代码
  • 第二种:全局调整

    修改flink-conf.yaml
      state.backend: filesystem
      state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
      注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
    复制代码

8.2 精彩案例实战

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class SocketWindowWordCountJavaCheckPoint {

    public static void main(String[] args) throws Exception{
        //获取需要的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9010;
        }

        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(1000);
        
        // 高级选项:
        // 设置模式为exactly-once (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
        
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        //设置statebackend

        //env.setStateBackend(new MemoryStateBackend());
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

        String hostname = "SparkMaster";
        String delimiter = "\n";
        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c

        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
                .sum("count");//在这里使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把数据打印到控制台并且设置并行度
        windowCounts.print().setParallelism(1);

        //这一行代码一定要实现,否则程序不执行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "作者 : 秦凯新 , 窗大小2秒,滑动1秒       {" +
                    " word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}
复制代码

8.3 精彩案例结果

Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试
Flink 状态管理与checkPoint数据容错机制深入剖析-Flink牛刀小试

9 华山论剑结束

这里围绕状态管理进行了详细的说明。一篇好文不容易,请发表你的评论,给予作者以肯定,谢谢。后续更精彩!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Rails 5敏捷开发

Rails 5敏捷开发

[美] Sam Ruby、[美] Dave Thomas、[美] David Heinemeier Hansson / 安道、叶炜、大疆Ruby技术团队 / 华中科技大学出版社 / 2017-12-30 / 115.00

本书以讲解“购书网站”案例为主线,逐步介绍Rails的内置功能。全书分为3部分,第一部分介绍Rails的安装、应用程序验证、Rails框架的体系结构,以及Ruby语言知识;第二部分用迭代方式构建应用程序,然后依据敏捷开发模式开展测试,最后用Capistrano完成部署;第三部分补充日常实用的开发知识。本书既有直观的示例,又有深入的分析,同时涵盖了Web开发各方面的知识,堪称一部内容全面而又深入浅出......一起来看看 《Rails 5敏捷开发》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换