微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
[如果你觉得深广大数据Club对你有帮助,欢迎赞赏]
从上一片《Flink源码解析 | 从Example出发理解Flink-Flink启动》之后,本文讲解Apache Flink example中的SocketWindowWordCount实例代码的实现。
SocketWindowWordCount
首先我们先来看下SocketWindowWordCount的重要代码内容
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
上述代码分为以下几块内容:
-
获取执行环境
-
实例化DataStream对象
-
执行数据处理获取windowCounts
-
Map - flatMap
-
transaction - keyby
-
reduce
-
打印
-
调用env的execute运行任务
这里的MapFunction以及ReduceFunction可以根据你的业务场景自行实现。
StreamExecutionEnvironment实例化
public static StreamExecutionEnvironment getExecutionEnvironment() {
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
// because the streaming project depends on "flink-clients" (and not the other way around)
// we currently need to intercept the data set environment and create a dependent stream env.
// this should be fixed once we rework the project dependencies
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
}
创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。
本地模式调用createLocalEnvironment()方法创建StreamEnvironment。
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalParallelism);
}
public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
return createLocalEnvironment(parallelism, new Configuration());
}
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
final LocalStreamEnvironment currentEnvironment;
currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
return currentEnvironment;
}
方法一轮轮调用下来最终实例化一个LocalStreamEnvironment返回。
LocalStreamEnvironment
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
整体代码分为以下几步:
-
创建streamGraph
-
通过streamGraph创建jobGraph
-
创建Configuration
-
创建MiniClusterConfiguration,并设置每一个TaskManager使用的slot数量setNumSlotsPerTaskManager
-
创建miniCluster
-
通过miniCluster.executeJobBlocking执行jobGraph
注:jobGraph是我们要利用miniCluster运行获取结果的Graph有向无环图。
MiniCluster
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "FlinkMiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
LOG.debug("Using configuration {}", miniClusterConfiguration);
-----------------获取配置信息---------------------
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
try {
---------初始化 IO Format类 --------------
initializeIOFormatClasses(configuration);
---------注册MetricsRegistry并实例化jobManagerMetricGroup-------
LOG.info("Starting Metrics Registry");
metricRegistry = createMetricRegistry(configuration);
this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
"localhost",
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
final RpcService jobManagerRpcService;
final RpcService resourceManagerRpcService;
final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
// we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
---------创建ActorSystem-------------
// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
configuration,
commonRpcService.getAddress(),
LOG);
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
---------实例化jobManagerRpcService、resourceManagerRpcService、taskManagerRpcServices ----------------
if (useSingleRpcService) {
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = commonRpcService;
}
jobManagerRpcService = commonRpcService;
resourceManagerRpcService = commonRpcService;
this.resourceManagerRpcService = null;
this.jobManagerRpcService = null;
this.taskManagerRpcServices = null;
}
else {
// start a new service per component, possibly with custom bind addresses
final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
for (int i = 0; i < numTaskManagers; i++) {
taskManagerRpcServices[i] = createRpcService(
configuration, rpcTimeout, true, taskManagerBindAddress);
}
this.jobManagerRpcService = jobManagerRpcService;
this.taskManagerRpcServices = taskManagerRpcServices;
this.resourceManagerRpcService = resourceManagerRpcService;
}
----------创建ha services---------------
// create the high-availability services
LOG.info("Starting high-availability services");
haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
configuration,
commonRpcService.getExecutor());
----------创建blob server---------------
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
----------创建心跳 server---------------
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
---------- 启动ResourceManager-----------------
// bring up the ResourceManager(s)
LOG.info("Starting ResourceManger");
resourceManagerRunner = startResourceManager(
configuration,
haServices,
heartbeatServices,
metricRegistry,
resourceManagerRpcService,
new ClusterInformation("localhost", blobServer.getPort()),
jobManagerMetricGroup);
---------创建BlobCacheService--------------------
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
---------启动TaskManager----------------
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
taskManagers = startTaskManagers(
configuration,
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
numTaskManagers,
taskManagerRpcServices);
--------启动调度程序rest端口-----------------
// starting the dispatcher rest endpoint
LOG.info("Starting dispatcher rest endpoint.");
-
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
20,
Time.milliseconds(20L));
this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
haServices.getWebMonitorLeaderElectionService(),
new ShutDownFatalErrorHandler());
dispatcherRestEndpoint.start();
restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
--------启动HistoryServerArchivist-----------------
// bring up the dispatcher that launches JobManagers when jobs submitted
LOG.info("Starting job dispatcher(s) for JobManger");
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
-------实例化UI数据显示器并启动-----------------
dispatcher = new StandaloneDispatcher(
jobManagerRpcService,
Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
configuration,
haServices,
resourceManagerRunner.getResourceManageGateway(),
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
new MemoryArchivedExecutionGraphStore(),
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
new ShutDownFatalErrorHandler(),
dispatcherRestEndpoint.getRestBaseUrl(),
historyServerArchivist);
dispatcher.start();
------获取ResourceManagerLeader、dispatcherLeaderRetriever并启动------------
resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
}
catch (Exception e) {
// cleanup everything
try {
close();
} catch (Exception ee) {
e.addSuppressed(ee);
}
throw e;
}
// create a new termination future
terminationFuture = new CompletableFuture<>();
// now officially mark this as running
running = true;
LOG.info("Flink Mini Cluster started successfully");
}
}
MiniCluster所做的事情较多,具体步骤如下:
-
获取配置信息
-
初始化 IO Format类
-
注册MetricsRegistry并实例化jobManagerMetricGroup
-
启动rpc服务
-
启动HA服务
-
启动resourceManager
-
启动TaskManagers
-
启动调度程序rest端口
-
在提交工作时启动JobManagers的分配器
-
获取ResourceManagerLeader、dispatcherLeaderRetriever并启动
总结
简化的描述下整个流程的处理过程:
-
创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment
-
调用StreamExecutionEnvironment对象的execute方法
-
获取streamGraph
-
获取jobGraph
-
实例化miniCluster
-
miniCluster.executeJobBlocking指定要运行的jobGraph
-
启动minCluster执行任务
启动各类所需服务(rpc、ha、resourceManager、TaskManagers等等)
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Text Processing in Python
David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99
Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!