【Flink源码】从StreamExecutionEnvironment.execute看Flink提交过程
admin
2024-01-20 06:37:11
0
env.execute("Order Count");

相信大家对这一行代码都不陌生,其作用是执行 Flink 程序,相当于是一个总开关。
很难想象,那么复杂的 Flink 架构,那么复杂的 Flink 程序仅仅需要这简单的一个函数就能启动,其背后究竟是怎样的过程?


execute 与 Flink 执行原理

StreamExecutionEnvironment.java

public JobExecutionResult execute() throws Exception {return execute((String) null);
}

execute 方法的参数为 jobName,若未指定则自动赋为 null

/**
* Triggers the program execution. The environment will execute all parts of the program that
* have resulted in a "sink" operation. Sink operations are for example printing results or
* forwarding them to a message queue.
*
* 

The program execution will be logged and displayed with the provided name * * @param jobName Desired name of the job * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception which occurs during job execution. */ public JobExecutionResult execute(String jobName) throws Exception {final List> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {Optional clusterDatasetCorruptedException =ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);if (!clusterDatasetCorruptedException.isPresent()) {throw t;}// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);} }

这一大段注释的大意是触发程序执行,环境将执行导致 sink 操作的程序的所有部分。
该方法首先通过 getStreamGraph 方法获取了 StreamGraph 对象。

public StreamGraph getStreamGraph() {return getStreamGraph(true);
}public StreamGraph getStreamGraph(boolean clearTransformations) {final StreamGraph streamGraph = getStreamGraph(transformations);if (clearTransformations) {transformations.clear();}return streamGraph;
}

由源码可知该方法的主要作用是获取流的执行图,若参数 clearTransformations 为 true(默认为 true)则清空 transformations。
这里的 transformations 是一个 List 的对象,包含一系列流的转换操作,而 Transformation 本身是一个抽象类,用于完成从输入流到输出流的转换,也就是我们常用的 map、filter 等转换算子其底层都是一棵 Transformation 树。任何一个 Flink 程序只要包含流的输入与输出都会存在一棵 Transformation 树。
Flink 程序会基于 Transformation 列表将其转化为 StreamGraph
这里清空 transformations 也就是清除这棵转换树。在完成了到 StreamGraph 的转换后清除树。
我们再继续往下看调用的 getStreamGraph(List> transformations)

private StreamGraph getStreamGraph(List> transformations) {synchronizeClusterDatasetStatus();return getStreamGraphGenerator(transformations).generate();
}private void synchronizeClusterDatasetStatus() {if (cachedTransformations.isEmpty()) {return;}Set completedClusterDatasets =listCompletedClusterDatasets().stream().map(AbstractID::new).collect(Collectors.toSet());cachedTransformations.forEach((id, transformation) -> {transformation.setCached(completedClusterDatasets.contains(id));});
}

synchronizeClusterDatasetStatus 顾名思义,同步集群数据集状态。
其中,cachedTransformations 是一个 Map> 变量,表示集群中各个数据集的缓存转换算子。synchronizeClusterDatasetStatus 方法就是将已完成算子到执行图转化的数据集列表与缓存列表同步。
接下来,调用 getStreamGraphGenerator 生成执行图。

public StreamGraph generateStreamGraph(List> transformations) {return getStreamGraphGenerator(transformations).generate();
}private StreamGraphGenerator getStreamGraphGenerator(List> transformations) {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}// We copy the transformation so that newly added transformations cannot intervene with the// stream graph generation.return new StreamGraphGenerator(new ArrayList<>(transformations), config, checkpointCfg, configuration).setStateBackend(defaultStateBackend).setChangelogStateBackendEnabled(changelogStateBackendEnabled).setSavepointDir(defaultSavepointDirectory).setChaining(isChainingEnabled).setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic).setDefaultBufferTimeout(bufferTimeout).setSlotSharingGroupResource(slotSharingGroupResources);
}

至此,我们终于找到了真正生成执行图的类 StreamGraphGenerator。这个我们稍后再说。
相信读者看到这里可能都忘了我们开始的地方,现在我们回到最初的 execute() 方法。

public JobExecutionResult execute(String jobName) throws Exception {final List> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {Optional clusterDatasetCorruptedException =ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);if (!clusterDatasetCorruptedException.isPresent()) {throw t;}// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);}
}

在完成了执行图的生成后,调用 execute(streamGraph),将执行图赋给执行程序,并在出错后重新获取执行图再次执行。
接下来我们继续看 execute(streamGraph)

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {final JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {jobExecutionResult = jobClient.getJobExecutionResult().get();} else {jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {// get() on the JobExecutionResult Future will throw an ExecutionException. This// behaviour was largely not there in Flink versions before the PipelineExecutor// refactoring so we should strip that exception.Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);// never reached, only make javac happyreturn null;}
}

这个方法做了两件事:

  • 调用真正执行的方法 executeAsync(streamGraph)返回 JobClient
  • 针对执行结果,通过 jobClient.getJobExecutionResult().get() 获取

这里特别要提一下,JobClient 接口是任务执行的起点,负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。执行完成后,将结果返回给用户。这里就是通过 JobClient 取出执行结果 JobExecutionResult 对象。

不知道你是否注意到,任务完成后会执行 jobListeners 的 forEach 操作。jobListener 是 List 变量。
关于 JobListner 接口,源码注释如下:

/*** A listener that is notified on specific job status changed, which should be firstly registered by* {@code #registerJobListener} of execution environments.** 

It is highly recommended NOT to perform any blocking operation inside the callbacks. If you* block the thread the invoker of environment execute methods is possibly blocked.*/ @PublicEvolving public interface JobListener {

大意是在特定作业状态更改时被通知的侦听器,在 StreamExecutionEnvironment 中通过 registrJobListener 方法注册

public void registerJobListener(JobListener jobListener) {checkNotNull(jobListener, "JobListener cannot be null");jobListeners.add(jobListener);
}

而在任务执行完成后,会将其置为 null,表示执行 finished。
接下来我们继续看真正执行 execute 操作的 executeAsync(StreamGraph streamGraph) 方法

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {checkNotNull(streamGraph, "StreamGraph cannot be null.");final PipelineExecutor executor = getPipelineExecutor();CompletableFuture jobClientFuture =executor.execute(streamGraph, configuration, userClassloader);try {JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));collectIterators.clear();return jobClient;} catch (ExecutionException executionException) {final Throwable strippedException =ExceptionUtils.stripExecutionException(executionException);jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),strippedException);}
}

兜了一大圈,终于找到真正执行的方法,异步方法。
PipelineExecutor 按源码注释的解释是负责用户作业执行的实体,它由PipelineFactory 根据配置中确定的 Flink 环境按 yarn、standalone、per-job、local 几种不同情况生产对应的 Pipeline。这一点可以在 getPipelineExecutor 方法中得到证实

private PipelineExecutor getPipelineExecutor() throws Exception {checkNotNull(configuration.get(DeploymentOptions.TARGET),"No execution.target specified in your configuration file.");final PipelineExecutorFactory executorFactory =executorServiceLoader.getExecutorFactory(configuration);checkNotNull(executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",configuration.get(DeploymentOptions.TARGET));return executorFactory.getExecutor(configuration);
}

获取对应环境的 PipelineExecutor 后调用接口中的 execute 方法执行,并将执行图、配置、类加载器作为参数传入
要想进一步搞清执行逻辑,我们必须继续深入探究 Pipeline.execute 的执行逻辑。
我们再官方文档中找到继承 Pipeline 接口的类有 AbstractJobClusterExecutor, AbstractSessionClusterExecutor, EmbeddedExecutor, KubernetesSessionClusterExecutor, LocalExecutor, RemoteExecutor, YarnJobClusterExecutor, YarnSessionClusterExecutor
下面我们就本地执行为例,探究 LocalExecutor 执行原理

public CompletableFuture execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws Exception {checkNotNull(pipeline);checkNotNull(configuration);Configuration effectiveConfig = new Configuration();effectiveConfig.addAll(this.configuration);effectiveConfig.addAll(configuration);// we only support attached execution with the local executor.checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

该方法执行流程为:

  • 将实例化 LocalExecutor 时添加的额外配置和用户配置合并为一个 Configuration
  • 创建 JobGraph,作业执行图
  • 调用 createWithFactory 和 submitJob 提交任务

这里的 JobGraph 是由 StreamGraph 转化而来,转化过程看 getJobGraph 方法
关于 JobGraph 的获取方法,我们留待后面讨论
这里 PerJobMiniClusterFactory.createWithFactory 创建了一个 PerJobMiniClusterFactory 对象
submitJob 开始了一个 MiniCluster 并提交了一个任务,具体代码如下:

public CompletableFuture submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {MiniClusterConfiguration miniClusterConfig =getMiniClusterConfig(jobGraph.getMaximumParallelism());MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);miniCluster.start();return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(() ->miniCluster.getJobStatus(submissionResult.getJobID()).get(),() ->miniCluster.requestJobResult(submissionResult.getJobID()).get(),userCodeClassloader);return submissionResult;})).thenApply(result ->new MiniClusterJobClient(result.getJobID(),miniCluster,userCodeClassloader,MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER)).whenComplete((ignored, throwable) -> {if (throwable != null) {// We failed to create the JobClient and must shutdown to ensure// cleanup.shutDownCluster(miniCluster);}}).thenApply(Function.identity());
}

至此,我们总算明白了,execute 到最后是开启了一个 MiniCluster 并将 JobGraph 作为参数提交任务。
而 MiniCluster 在官方文档上的解释为 本地执行 Flink jobs 的 mini 集群。

总结:

  • execute 的执行过程:
  • 转换 Transformation 为 StreamGraph
  • 提供执行需要的额外配置、监听方法等
  • 将 StreamGraph 转换为可执行的 JobGraph
  • 根据运行环境的不同创建不同的执行器
  • 在本地环境下,开启一个 MiniCluster 并将 JobGraph 提交任务执行

相关内容

热门资讯

股价波动较大!易点天下:15日... 【大河财立方消息】1月14日,易点天下网络科技股份有限公司(简称易点天下)公告称,公司股价波动较大,...
币圈院士:1.15比特币过关斩... 交易的根本是生存,其次才是收益,所以每次操作之前先想清楚自己的操作是否合理,本金是否安全,要形成一套...
专家解读 | 数据应用场景激活... 文 | 北京软件和信息服务业协会 国家数据局会同有关部门研究编制了《工业制造、现代农业等九个领域“数...
2026年十大危机公关公司榜单... 凌晨三点,某上市公司公关总监的手机被一条视频推送点亮。短短15秒的剪辑,将产品质量问题放大成了企业道...
原创 寒... 2025年寒冬的乌克兰,并非末日电影中的场景,而是残酷的现实。当零下十五度的凛冽寒流裹挟着鹅毛大雪,...
白银站稳90美元关口,白银矿业... 来源:环球市场播报 核心要点 现货白银价格于周二首次突破每盎司 90 美元,并在周三交易时段延续...
AI应用端全面爆发!4大黄金赛... 2026年1月14日周三的A股市场,AI应用端彻底点燃全场!浩瀚深度、壹网壹创等多只个股20CM涨停...
FXGT:平台监管合规与全球市... 本文探讨FXGT平台的核心优势,重点分析其监管合规性和全球市场连接的整合价值。通过严格的合规框架,F...
原创 1... 写在文章前的声明:在本文之前的说明:本文中所列的投资信息,只是一个对基金资产净值进行排行的客观描述,...
原创 美... 2026 年 1 月 13 日,美国多家媒体集中披露两条重磅消息,中国美债持仓降至 6887 亿美元...
融资保证金比例重回100%:A... "两融余额突破2.67万亿!"当这个数字刷屏各大财经媒体时,监管层的一纸通知瞬间引爆市场——融资保证...
靠中式精酿9个月狂卖11亿,河... 不到两年时间,一群“微醺女孩”把一家成立44年的河南地方啤酒厂推到IPO门口。 1月13日,河南金星...
原创 黄... 哈喽大家好,今天小无带大家聊聊最近刷屏的抢金热潮!金饰价格飙涨不停,一条项链一夜涨1.5万还被疯抢,...
原创 虚... 小睿就来深扒“纸上黄金”的IPO迷局,Suplay冲刺港股欲成“收藏卡第一股”,靠米哈游IP赚足利润...
北京CBD千亿规模国际级商圈初... 央广网北京1月14日消息(记者 王进文)1月14日,记者从北京市朝阳区两会新闻发布会上了解到,北京商...
原创 9... 什么样的酒能赢得市场? 2026年开年,A股市场的“分裂感”格外清晰。一边,是上证指数稳步站上410...
北方稀土设备供应商,广泰真空上... 来源丨时代商业研究院 作者丨陆烁宜 编辑丨郑琳 时隔3个月,“超长验收”项目披露的数量却翻倍,沈阳广...
热点城市启动新年“第一拍” 民... 来源:21世纪经济报道 21世纪经济报道记者 张敏 1月14日,青岛2026年首场宅地拍卖落锤。在市...
啤酒卖不动了,中式精酿能救金星... 在中国啤酒行业,已经很久没有出现真正意义上的 " 新故事 " 了。 过去十余年,这个一度被视为现金牛...
小组第二出线!U23亚洲杯-李... 北京时间1月14日消息,2026年U23亚洲杯小组赛继续进行,在D组最后一轮争夺中,中国U23男足迎...