【Flink源码】从StreamExecutionEnvironment.execute看Flink提交过程
admin
2024-01-20 06:37:11
0
env.execute("Order Count");

相信大家对这一行代码都不陌生,其作用是执行 Flink 程序,相当于是一个总开关。
很难想象,那么复杂的 Flink 架构,那么复杂的 Flink 程序仅仅需要这简单的一个函数就能启动,其背后究竟是怎样的过程?


execute 与 Flink 执行原理

StreamExecutionEnvironment.java

public JobExecutionResult execute() throws Exception {return execute((String) null);
}

execute 方法的参数为 jobName,若未指定则自动赋为 null

/**
* Triggers the program execution. The environment will execute all parts of the program that
* have resulted in a "sink" operation. Sink operations are for example printing results or
* forwarding them to a message queue.
*
* 

The program execution will be logged and displayed with the provided name * * @param jobName Desired name of the job * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception which occurs during job execution. */ public JobExecutionResult execute(String jobName) throws Exception {final List> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {Optional clusterDatasetCorruptedException =ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);if (!clusterDatasetCorruptedException.isPresent()) {throw t;}// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);} }

这一大段注释的大意是触发程序执行,环境将执行导致 sink 操作的程序的所有部分。
该方法首先通过 getStreamGraph 方法获取了 StreamGraph 对象。

public StreamGraph getStreamGraph() {return getStreamGraph(true);
}public StreamGraph getStreamGraph(boolean clearTransformations) {final StreamGraph streamGraph = getStreamGraph(transformations);if (clearTransformations) {transformations.clear();}return streamGraph;
}

由源码可知该方法的主要作用是获取流的执行图,若参数 clearTransformations 为 true(默认为 true)则清空 transformations。
这里的 transformations 是一个 List 的对象,包含一系列流的转换操作,而 Transformation 本身是一个抽象类,用于完成从输入流到输出流的转换,也就是我们常用的 map、filter 等转换算子其底层都是一棵 Transformation 树。任何一个 Flink 程序只要包含流的输入与输出都会存在一棵 Transformation 树。
Flink 程序会基于 Transformation 列表将其转化为 StreamGraph
这里清空 transformations 也就是清除这棵转换树。在完成了到 StreamGraph 的转换后清除树。
我们再继续往下看调用的 getStreamGraph(List> transformations)

private StreamGraph getStreamGraph(List> transformations) {synchronizeClusterDatasetStatus();return getStreamGraphGenerator(transformations).generate();
}private void synchronizeClusterDatasetStatus() {if (cachedTransformations.isEmpty()) {return;}Set completedClusterDatasets =listCompletedClusterDatasets().stream().map(AbstractID::new).collect(Collectors.toSet());cachedTransformations.forEach((id, transformation) -> {transformation.setCached(completedClusterDatasets.contains(id));});
}

synchronizeClusterDatasetStatus 顾名思义,同步集群数据集状态。
其中,cachedTransformations 是一个 Map> 变量,表示集群中各个数据集的缓存转换算子。synchronizeClusterDatasetStatus 方法就是将已完成算子到执行图转化的数据集列表与缓存列表同步。
接下来,调用 getStreamGraphGenerator 生成执行图。

public StreamGraph generateStreamGraph(List> transformations) {return getStreamGraphGenerator(transformations).generate();
}private StreamGraphGenerator getStreamGraphGenerator(List> transformations) {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}// We copy the transformation so that newly added transformations cannot intervene with the// stream graph generation.return new StreamGraphGenerator(new ArrayList<>(transformations), config, checkpointCfg, configuration).setStateBackend(defaultStateBackend).setChangelogStateBackendEnabled(changelogStateBackendEnabled).setSavepointDir(defaultSavepointDirectory).setChaining(isChainingEnabled).setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic).setDefaultBufferTimeout(bufferTimeout).setSlotSharingGroupResource(slotSharingGroupResources);
}

至此,我们终于找到了真正生成执行图的类 StreamGraphGenerator。这个我们稍后再说。
相信读者看到这里可能都忘了我们开始的地方,现在我们回到最初的 execute() 方法。

public JobExecutionResult execute(String jobName) throws Exception {final List> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {Optional clusterDatasetCorruptedException =ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);if (!clusterDatasetCorruptedException.isPresent()) {throw t;}// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);}
}

在完成了执行图的生成后,调用 execute(streamGraph),将执行图赋给执行程序,并在出错后重新获取执行图再次执行。
接下来我们继续看 execute(streamGraph)

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {final JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {jobExecutionResult = jobClient.getJobExecutionResult().get();} else {jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {// get() on the JobExecutionResult Future will throw an ExecutionException. This// behaviour was largely not there in Flink versions before the PipelineExecutor// refactoring so we should strip that exception.Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);// never reached, only make javac happyreturn null;}
}

这个方法做了两件事:

  • 调用真正执行的方法 executeAsync(streamGraph)返回 JobClient
  • 针对执行结果,通过 jobClient.getJobExecutionResult().get() 获取

这里特别要提一下,JobClient 接口是任务执行的起点,负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。执行完成后,将结果返回给用户。这里就是通过 JobClient 取出执行结果 JobExecutionResult 对象。

不知道你是否注意到,任务完成后会执行 jobListeners 的 forEach 操作。jobListener 是 List 变量。
关于 JobListner 接口,源码注释如下:

/*** A listener that is notified on specific job status changed, which should be firstly registered by* {@code #registerJobListener} of execution environments.** 

It is highly recommended NOT to perform any blocking operation inside the callbacks. If you* block the thread the invoker of environment execute methods is possibly blocked.*/ @PublicEvolving public interface JobListener {

大意是在特定作业状态更改时被通知的侦听器,在 StreamExecutionEnvironment 中通过 registrJobListener 方法注册

public void registerJobListener(JobListener jobListener) {checkNotNull(jobListener, "JobListener cannot be null");jobListeners.add(jobListener);
}

而在任务执行完成后,会将其置为 null,表示执行 finished。
接下来我们继续看真正执行 execute 操作的 executeAsync(StreamGraph streamGraph) 方法

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {checkNotNull(streamGraph, "StreamGraph cannot be null.");final PipelineExecutor executor = getPipelineExecutor();CompletableFuture jobClientFuture =executor.execute(streamGraph, configuration, userClassloader);try {JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));collectIterators.clear();return jobClient;} catch (ExecutionException executionException) {final Throwable strippedException =ExceptionUtils.stripExecutionException(executionException);jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),strippedException);}
}

兜了一大圈,终于找到真正执行的方法,异步方法。
PipelineExecutor 按源码注释的解释是负责用户作业执行的实体,它由PipelineFactory 根据配置中确定的 Flink 环境按 yarn、standalone、per-job、local 几种不同情况生产对应的 Pipeline。这一点可以在 getPipelineExecutor 方法中得到证实

private PipelineExecutor getPipelineExecutor() throws Exception {checkNotNull(configuration.get(DeploymentOptions.TARGET),"No execution.target specified in your configuration file.");final PipelineExecutorFactory executorFactory =executorServiceLoader.getExecutorFactory(configuration);checkNotNull(executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",configuration.get(DeploymentOptions.TARGET));return executorFactory.getExecutor(configuration);
}

获取对应环境的 PipelineExecutor 后调用接口中的 execute 方法执行,并将执行图、配置、类加载器作为参数传入
要想进一步搞清执行逻辑,我们必须继续深入探究 Pipeline.execute 的执行逻辑。
我们再官方文档中找到继承 Pipeline 接口的类有 AbstractJobClusterExecutor, AbstractSessionClusterExecutor, EmbeddedExecutor, KubernetesSessionClusterExecutor, LocalExecutor, RemoteExecutor, YarnJobClusterExecutor, YarnSessionClusterExecutor
下面我们就本地执行为例,探究 LocalExecutor 执行原理

public CompletableFuture execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws Exception {checkNotNull(pipeline);checkNotNull(configuration);Configuration effectiveConfig = new Configuration();effectiveConfig.addAll(this.configuration);effectiveConfig.addAll(configuration);// we only support attached execution with the local executor.checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

该方法执行流程为:

  • 将实例化 LocalExecutor 时添加的额外配置和用户配置合并为一个 Configuration
  • 创建 JobGraph,作业执行图
  • 调用 createWithFactory 和 submitJob 提交任务

这里的 JobGraph 是由 StreamGraph 转化而来,转化过程看 getJobGraph 方法
关于 JobGraph 的获取方法,我们留待后面讨论
这里 PerJobMiniClusterFactory.createWithFactory 创建了一个 PerJobMiniClusterFactory 对象
submitJob 开始了一个 MiniCluster 并提交了一个任务,具体代码如下:

public CompletableFuture submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {MiniClusterConfiguration miniClusterConfig =getMiniClusterConfig(jobGraph.getMaximumParallelism());MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);miniCluster.start();return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(() ->miniCluster.getJobStatus(submissionResult.getJobID()).get(),() ->miniCluster.requestJobResult(submissionResult.getJobID()).get(),userCodeClassloader);return submissionResult;})).thenApply(result ->new MiniClusterJobClient(result.getJobID(),miniCluster,userCodeClassloader,MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER)).whenComplete((ignored, throwable) -> {if (throwable != null) {// We failed to create the JobClient and must shutdown to ensure// cleanup.shutDownCluster(miniCluster);}}).thenApply(Function.identity());
}

至此,我们总算明白了,execute 到最后是开启了一个 MiniCluster 并将 JobGraph 作为参数提交任务。
而 MiniCluster 在官方文档上的解释为 本地执行 Flink jobs 的 mini 集群。

总结:

  • execute 的执行过程:
  • 转换 Transformation 为 StreamGraph
  • 提供执行需要的额外配置、监听方法等
  • 将 StreamGraph 转换为可执行的 JobGraph
  • 根据运行环境的不同创建不同的执行器
  • 在本地环境下,开启一个 MiniCluster 并将 JobGraph 提交任务执行

相关内容

热门资讯

原创 4... 写在文章前的声明:在本文之前的说明:本文中所列的投资信息,只是一个对基金资产净值进行排行的客观描述,...
胜宏科技港股大涨49% 做完英... 记者 陈月芹 4月21日,全球AI算力板龙头胜宏科技(02476.HK)登陆港交所,上市首日股价大涨...
永赢基金:聚焦“科技新锐”,科... 数据来源:Wind,时间统计区间为2025/1/1-2026/4/21,指数过往表现不预示未来,不构...
五大阅读趋势显现!当当网发布2... 在第31个世界读书日即将来临之际及首个全民阅读活动周期间,当当网正式发布2026国民阅读洞察报告。 ...
业绩逐季回暖 老百姓大药房一季... 上证报中国证券网讯(记者 夏子航)4月22日晚,老百姓大药房发布2025年年报和2026年一季报。今...
中国20强城市大洗牌:苏州接近... 中国的城市经济竞争格局一直在变化,每年发布的GDP数据都会对城市经济实力进行重新排列。2025年榜又...
直击金宏气体股东会:预期年内氦... 《科创板日报》4月22日讯(记者 郭辉)金宏气体日前举行2025年度股东大会。会上该公司审议了公司年...
5月1日起,俄据悉将叫停哈萨克... 据行业消息人士透露,俄罗斯将于5月1日起停止经友谊管道转运哈萨克斯坦输往德国的石油,相关调整计划已送...
深化具身智能生态布局 京东携手... 4 月 22 日,京东与国内消费级人形机器人头部企业松延动力正式达成三年期战略合作。双方将围绕产品研...
原创 帮... 先问你一个问题,美伊停火今晚到期,按常理避险情绪该升温,黄金应该涨吧?结果恰恰相反——原油涨了,黄金...
300295、600889,将... 三六五网、南京化纤,将被*ST。 公司股票自4月23日开市起停牌一天,于4月24日开市起复牌并实施退...
能源大变天!外媒:羡慕中国的石... 这一次油价突破 110 美元的能源危机,着实魔幻。如果放在十年前,没人会相信中国能在这场风波中获利,...
黄金涨跌两难,现在还能上车吗? 中新网4月22日电(记者 左雨晴) 四月以来,美伊局势反复拉扯,美联储降息预期一变再变。黄金价格在4...
“我身体健康”,库克现身员工大... 当地时间4月21日,受苹果官宣CEO换届影响,公司股价盘中下探超2%,总市值失守4万亿美元关口,收盘...
库克留下一个悬念 工程师能否拯救创新节奏? 听筒Tech(ID:tingtongtech)原创 文 | 赵 森 ...
探索消费信贷与社交支付深度融合... 腾讯这一金融产品再添新功能,4月19日,北京商报记者注意到,微信分付灰度测试转账功能引发热议,在向微...
土耳其主要银行股指早盘下跌2% 每经AI快讯,4月20日,土耳其主要银行股指早盘下跌2%。 每日经济新闻
好用的OTA代运营源头厂家 在如今竞争激烈的酒旅行业中,OTA代运营服务成为了众多酒店、民宿提升竞争力的关键。但市场上的代运营厂...
成都五一出游全国热门第三 “五一”假期临近,同程旅行最新发布的《2026“五一”旅行趋势报告》显示,今年“五一”期间成都同时位...