Flink 源码解析之从 Example 出发:读懂本地任务执行流程

栏目: 服务器 · 发布时间: 5年前

Flink 源码解析之从 Example 出发:读懂本地任务执行流程

微信公众号: 深广大数据Club

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

[如果你觉得深广大数据Club对你有帮助,欢迎赞赏]

从上一片《Flink源码解析 | 从Example出发理解Flink-Flink启动》之后,本文讲解Apache Flink example中的SocketWindowWordCount实例代码的实现。

SocketWindowWordCount

首先我们先来看下SocketWindowWordCount的重要代码内容

// get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy("word")
                .timeWindow(Time.seconds(5))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");

上述代码分为以下几块内容:

  • 获取执行环境

  • 实例化DataStream对象

  • 执行数据处理获取windowCounts

    • Map - flatMap

    • transaction - keyby

    • reduce

  • 打印

  • 调用env的execute运行任务

这里的MapFunction以及ReduceFunction可以根据你的业务场景自行实现。

StreamExecutionEnvironment实例化

public static StreamExecutionEnvironment getExecutionEnvironment() {
        if (contextEnvironmentFactory != null) {
            return contextEnvironmentFactory.createExecutionEnvironment();
        }

        // because the streaming project depends on "flink-clients" (and not the other way around)
        // we currently need to intercept the data set environment and create a dependent stream env.
        // this should be fixed once we rework the project dependencies

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (env instanceof ContextEnvironment) {
            return new StreamContextEnvironment((ContextEnvironment) env);
        } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
            return new StreamPlanEnvironment(env);
        } else {
            return createLocalEnvironment();
        }
    }

创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。

本地模式调用createLocalEnvironment()方法创建StreamEnvironment。

    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
        return createLocalEnvironment(parallelism, new Configuration());
    }

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
        final LocalStreamEnvironment currentEnvironment;

        currentEnvironment = new LocalStreamEnvironment(configuration);
        currentEnvironment.setParallelism(parallelism);

        return currentEnvironment;
    }

方法一轮轮调用下来最终实例化一个LocalStreamEnvironment返回。

LocalStreamEnvironment

public JobExecutionResult execute(String jobName) throws Exception {
        // transform the streaming program into a JobGraph
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);

        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

        // add (and override) the settings with what the user defined
        configuration.addAll(this.configuration);

        if (!configuration.contains(RestOptions.PORT)) {
            configuration.setInteger(RestOptions.PORT, 0);
        }

        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setConfiguration(configuration)
            .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
            .build();

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        MiniCluster miniCluster = new MiniCluster(cfg);

        try {
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }

整体代码分为以下几步:

  • 创建streamGraph

  • 通过streamGraph创建jobGraph

  • 创建Configuration

  • 创建MiniClusterConfiguration,并设置每一个TaskManager使用的slot数量setNumSlotsPerTaskManager

  • 创建miniCluster

  • 通过miniCluster.executeJobBlocking执行jobGraph

注:jobGraph是我们要利用miniCluster运行获取结果的Graph有向无环图。

MiniCluster

public void start() throws Exception {
        synchronized (lock) {
            checkState(!running, "FlinkMiniCluster is already running");

            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", miniClusterConfiguration);
            -----------------获取配置信息---------------------
            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
            final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
            final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

            try {
                ---------初始化 IO Format类 --------------
                initializeIOFormatClasses(configuration);
                ---------注册MetricsRegistry并实例化jobManagerMetricGroup-------
                LOG.info("Starting Metrics Registry");
                metricRegistry = createMetricRegistry(configuration);
                this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
                    metricRegistry,
                    "localhost",
                    ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

                final RpcService jobManagerRpcService;
                final RpcService resourceManagerRpcService;
                final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];

                // bring up all the RPC services
                LOG.info("Starting RPC Service(s)");

                // we always need the 'commonRpcService' for auxiliary calls
                commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
                ---------创建ActorSystem-------------
                // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
                metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
                    configuration,
                    commonRpcService.getAddress(),
                    LOG);
                metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
                ---------实例化jobManagerRpcService、resourceManagerRpcService、taskManagerRpcServices ----------------
                if (useSingleRpcService) {
                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = commonRpcService;
                    }

                    jobManagerRpcService = commonRpcService;
                    resourceManagerRpcService = commonRpcService;

                    this.resourceManagerRpcService = null;
                    this.jobManagerRpcService = null;
                    this.taskManagerRpcServices = null;
                }
                else {
                    // start a new service per component, possibly with custom bind addresses
                    final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
                    final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
                    final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();

                    jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
                    resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);

                    for (int i = 0; i < numTaskManagers; i++) {
                        taskManagerRpcServices[i] = createRpcService(
                                configuration, rpcTimeout, true, taskManagerBindAddress);
                    }

                    this.jobManagerRpcService = jobManagerRpcService;
                    this.taskManagerRpcServices = taskManagerRpcServices;
                    this.resourceManagerRpcService = resourceManagerRpcService;
                }
                ----------创建ha services---------------
                // create the high-availability services
                LOG.info("Starting high-availability services");
                haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                    configuration,
                    commonRpcService.getExecutor());
                ----------创建blob server---------------
                blobServer = new BlobServer(configuration, haServices.createBlobStore());
                blobServer.start();
                ----------创建心跳 server---------------
                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                ---------- 启动ResourceManager-----------------
                // bring up the ResourceManager(s)
                LOG.info("Starting ResourceManger");
                resourceManagerRunner = startResourceManager(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    resourceManagerRpcService,
                    new ClusterInformation("localhost", blobServer.getPort()),
                    jobManagerMetricGroup);
                ---------创建BlobCacheService--------------------
                blobCacheService = new BlobCacheService(
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
                );
                ---------启动TaskManager----------------
                // bring up the TaskManager(s) for the mini cluster
                LOG.info("Starting {} TaskManger(s)", numTaskManagers);
                taskManagers = startTaskManagers(
                    configuration,
                    haServices,
                    heartbeatServices,
                    metricRegistry,
                    blobCacheService,
                    numTaskManagers,
                    taskManagerRpcServices);

                --------启动调度程序rest端口-----------------
                // starting the dispatcher rest endpoint
                LOG.info("Starting dispatcher rest endpoint.");
                -
                dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    DispatcherGateway.class,
                    DispatcherId::fromUuid,
                    20,
                    Time.milliseconds(20L));
                final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
                    jobManagerRpcService,
                    ResourceManagerGateway.class,
                    ResourceManagerId::fromUuid,
                    20,
                    Time.milliseconds(20L));

                this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
                    RestServerEndpointConfiguration.fromConfiguration(configuration),
                    dispatcherGatewayRetriever,
                    configuration,
                    RestHandlerConfiguration.fromConfiguration(configuration),
                    resourceManagerGatewayRetriever,
                    blobServer.getTransientBlobService(),
                    WebMonitorEndpoint.createExecutorService(
                        configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
                        configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                        "DispatcherRestEndpoint"),
                    new AkkaQueryServiceRetriever(
                        metricQueryServiceActorSystem,
                        Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
                    haServices.getWebMonitorLeaderElectionService(),
                    new ShutDownFatalErrorHandler());

                dispatcherRestEndpoint.start();

                restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());

                --------启动HistoryServerArchivist-----------------
                // bring up the dispatcher that launches JobManagers when jobs submitted
                LOG.info("Starting job dispatcher(s) for JobManger");

                final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
                -------实例化UI数据显示器并启动-----------------
                dispatcher = new StandaloneDispatcher(
                    jobManagerRpcService,
                    Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
                    configuration,
                    haServices,
                    resourceManagerRunner.getResourceManageGateway(),
                    blobServer,
                    heartbeatServices,
                    jobManagerMetricGroup,
                    metricRegistry.getMetricQueryServicePath(),
                    new MemoryArchivedExecutionGraphStore(),
                    Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                    new ShutDownFatalErrorHandler(),
                    dispatcherRestEndpoint.getRestBaseUrl(),
                    historyServerArchivist);

                dispatcher.start();
                ------获取ResourceManagerLeader、dispatcherLeaderRetriever并启动------------
                resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
                dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();

                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
            }
            catch (Exception e) {
                // cleanup everything
                try {
                    close();
                } catch (Exception ee) {
                    e.addSuppressed(ee);
                }
                throw e;
            }

            // create a new termination future
            terminationFuture = new CompletableFuture<>();

            // now officially mark this as running
            running = true;

            LOG.info("Flink Mini Cluster started successfully");
        }
    }

MiniCluster所做的事情较多,具体步骤如下:

  • 获取配置信息

  • 初始化 IO Format类

  • 注册MetricsRegistry并实例化jobManagerMetricGroup

  • 启动rpc服务

  • 启动HA服务

  • 启动resourceManager

  • 启动TaskManagers

  • 启动调度程序rest端口

  • 在提交工作时启动JobManagers的分配器

  • 获取ResourceManagerLeader、dispatcherLeaderRetriever并启动

总结

简化的描述下整个流程的处理过程:

  • 创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment

  • 调用StreamExecutionEnvironment对象的execute方法

    • 获取streamGraph

    • 获取jobGraph

    • 实例化miniCluster

    • miniCluster.executeJobBlocking指定要运行的jobGraph

  • 启动minCluster执行任务

    启动各类所需服务(rpc、ha、resourceManager、TaskManagers等等) 


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Servlet与JSP核心技术

Servlet与JSP核心技术

/ 人民邮电出版社 / 2001-10 / 55.00元

一起来看看 《Servlet与JSP核心技术》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具