【Flink源码】从StreamExecutionEnvironment.execute看Flink提交过程
admin
2024-01-20 06:37:11
0
env.execute("Order Count");

相信大家对这一行代码都不陌生,其作用是执行 Flink 程序,相当于是一个总开关。
很难想象,那么复杂的 Flink 架构,那么复杂的 Flink 程序仅仅需要这简单的一个函数就能启动,其背后究竟是怎样的过程?


execute 与 Flink 执行原理

StreamExecutionEnvironment.java

public JobExecutionResult execute() throws Exception {return execute((String) null);
}

execute 方法的参数为 jobName,若未指定则自动赋为 null

/**
* Triggers the program execution. The environment will execute all parts of the program that
* have resulted in a "sink" operation. Sink operations are for example printing results or
* forwarding them to a message queue.
*
* 

The program execution will be logged and displayed with the provided name * * @param jobName Desired name of the job * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception which occurs during job execution. */ public JobExecutionResult execute(String jobName) throws Exception {final List> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {Optional clusterDatasetCorruptedException =ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);if (!clusterDatasetCorruptedException.isPresent()) {throw t;}// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);} }

这一大段注释的大意是触发程序执行,环境将执行导致 sink 操作的程序的所有部分。
该方法首先通过 getStreamGraph 方法获取了 StreamGraph 对象。

public StreamGraph getStreamGraph() {return getStreamGraph(true);
}public StreamGraph getStreamGraph(boolean clearTransformations) {final StreamGraph streamGraph = getStreamGraph(transformations);if (clearTransformations) {transformations.clear();}return streamGraph;
}

由源码可知该方法的主要作用是获取流的执行图,若参数 clearTransformations 为 true(默认为 true)则清空 transformations。
这里的 transformations 是一个 List 的对象,包含一系列流的转换操作,而 Transformation 本身是一个抽象类,用于完成从输入流到输出流的转换,也就是我们常用的 map、filter 等转换算子其底层都是一棵 Transformation 树。任何一个 Flink 程序只要包含流的输入与输出都会存在一棵 Transformation 树。
Flink 程序会基于 Transformation 列表将其转化为 StreamGraph
这里清空 transformations 也就是清除这棵转换树。在完成了到 StreamGraph 的转换后清除树。
我们再继续往下看调用的 getStreamGraph(List> transformations)

private StreamGraph getStreamGraph(List> transformations) {synchronizeClusterDatasetStatus();return getStreamGraphGenerator(transformations).generate();
}private void synchronizeClusterDatasetStatus() {if (cachedTransformations.isEmpty()) {return;}Set completedClusterDatasets =listCompletedClusterDatasets().stream().map(AbstractID::new).collect(Collectors.toSet());cachedTransformations.forEach((id, transformation) -> {transformation.setCached(completedClusterDatasets.contains(id));});
}

synchronizeClusterDatasetStatus 顾名思义,同步集群数据集状态。
其中,cachedTransformations 是一个 Map> 变量,表示集群中各个数据集的缓存转换算子。synchronizeClusterDatasetStatus 方法就是将已完成算子到执行图转化的数据集列表与缓存列表同步。
接下来,调用 getStreamGraphGenerator 生成执行图。

public StreamGraph generateStreamGraph(List> transformations) {return getStreamGraphGenerator(transformations).generate();
}private StreamGraphGenerator getStreamGraphGenerator(List> transformations) {if (transformations.size() <= 0) {throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");}// We copy the transformation so that newly added transformations cannot intervene with the// stream graph generation.return new StreamGraphGenerator(new ArrayList<>(transformations), config, checkpointCfg, configuration).setStateBackend(defaultStateBackend).setChangelogStateBackendEnabled(changelogStateBackendEnabled).setSavepointDir(defaultSavepointDirectory).setChaining(isChainingEnabled).setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic).setDefaultBufferTimeout(bufferTimeout).setSlotSharingGroupResource(slotSharingGroupResources);
}

至此,我们终于找到了真正生成执行图的类 StreamGraphGenerator。这个我们稍后再说。
相信读者看到这里可能都忘了我们开始的地方,现在我们回到最初的 execute() 方法。

public JobExecutionResult execute(String jobName) throws Exception {final List> originalTransformations = new ArrayList<>(transformations);StreamGraph streamGraph = getStreamGraph();if (jobName != null) {streamGraph.setJobName(jobName);}try {return execute(streamGraph);} catch (Throwable t) {Optional clusterDatasetCorruptedException =ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);if (!clusterDatasetCorruptedException.isPresent()) {throw t;}// Retry without cache if it is caused by corrupted cluster dataset.invalidateCacheTransformations(originalTransformations);streamGraph = getStreamGraph(originalTransformations);return execute(streamGraph);}
}

在完成了执行图的生成后,调用 execute(streamGraph),将执行图赋给执行程序,并在出错后重新获取执行图再次执行。
接下来我们继续看 execute(streamGraph)

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {final JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {jobExecutionResult = jobClient.getJobExecutionResult().get();} else {jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {// get() on the JobExecutionResult Future will throw an ExecutionException. This// behaviour was largely not there in Flink versions before the PipelineExecutor// refactoring so we should strip that exception.Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);// never reached, only make javac happyreturn null;}
}

这个方法做了两件事:

  • 调用真正执行的方法 executeAsync(streamGraph)返回 JobClient
  • 针对执行结果,通过 jobClient.getJobExecutionResult().get() 获取

这里特别要提一下,JobClient 接口是任务执行的起点,负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。执行完成后,将结果返回给用户。这里就是通过 JobClient 取出执行结果 JobExecutionResult 对象。

不知道你是否注意到,任务完成后会执行 jobListeners 的 forEach 操作。jobListener 是 List 变量。
关于 JobListner 接口,源码注释如下:

/*** A listener that is notified on specific job status changed, which should be firstly registered by* {@code #registerJobListener} of execution environments.** 

It is highly recommended NOT to perform any blocking operation inside the callbacks. If you* block the thread the invoker of environment execute methods is possibly blocked.*/ @PublicEvolving public interface JobListener {

大意是在特定作业状态更改时被通知的侦听器,在 StreamExecutionEnvironment 中通过 registrJobListener 方法注册

public void registerJobListener(JobListener jobListener) {checkNotNull(jobListener, "JobListener cannot be null");jobListeners.add(jobListener);
}

而在任务执行完成后,会将其置为 null,表示执行 finished。
接下来我们继续看真正执行 execute 操作的 executeAsync(StreamGraph streamGraph) 方法

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {checkNotNull(streamGraph, "StreamGraph cannot be null.");final PipelineExecutor executor = getPipelineExecutor();CompletableFuture jobClientFuture =executor.execute(streamGraph, configuration, userClassloader);try {JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));collectIterators.clear();return jobClient;} catch (ExecutionException executionException) {final Throwable strippedException =ExceptionUtils.stripExecutionException(executionException);jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),strippedException);}
}

兜了一大圈,终于找到真正执行的方法,异步方法。
PipelineExecutor 按源码注释的解释是负责用户作业执行的实体,它由PipelineFactory 根据配置中确定的 Flink 环境按 yarn、standalone、per-job、local 几种不同情况生产对应的 Pipeline。这一点可以在 getPipelineExecutor 方法中得到证实

private PipelineExecutor getPipelineExecutor() throws Exception {checkNotNull(configuration.get(DeploymentOptions.TARGET),"No execution.target specified in your configuration file.");final PipelineExecutorFactory executorFactory =executorServiceLoader.getExecutorFactory(configuration);checkNotNull(executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",configuration.get(DeploymentOptions.TARGET));return executorFactory.getExecutor(configuration);
}

获取对应环境的 PipelineExecutor 后调用接口中的 execute 方法执行,并将执行图、配置、类加载器作为参数传入
要想进一步搞清执行逻辑,我们必须继续深入探究 Pipeline.execute 的执行逻辑。
我们再官方文档中找到继承 Pipeline 接口的类有 AbstractJobClusterExecutor, AbstractSessionClusterExecutor, EmbeddedExecutor, KubernetesSessionClusterExecutor, LocalExecutor, RemoteExecutor, YarnJobClusterExecutor, YarnSessionClusterExecutor
下面我们就本地执行为例,探究 LocalExecutor 执行原理

public CompletableFuture execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)throws Exception {checkNotNull(pipeline);checkNotNull(configuration);Configuration effectiveConfig = new Configuration();effectiveConfig.addAll(this.configuration);effectiveConfig.addAll(configuration);// we only support attached execution with the local executor.checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

该方法执行流程为:

  • 将实例化 LocalExecutor 时添加的额外配置和用户配置合并为一个 Configuration
  • 创建 JobGraph,作业执行图
  • 调用 createWithFactory 和 submitJob 提交任务

这里的 JobGraph 是由 StreamGraph 转化而来,转化过程看 getJobGraph 方法
关于 JobGraph 的获取方法,我们留待后面讨论
这里 PerJobMiniClusterFactory.createWithFactory 创建了一个 PerJobMiniClusterFactory 对象
submitJob 开始了一个 MiniCluster 并提交了一个任务,具体代码如下:

public CompletableFuture submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {MiniClusterConfiguration miniClusterConfig =getMiniClusterConfig(jobGraph.getMaximumParallelism());MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);miniCluster.start();return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(() ->miniCluster.getJobStatus(submissionResult.getJobID()).get(),() ->miniCluster.requestJobResult(submissionResult.getJobID()).get(),userCodeClassloader);return submissionResult;})).thenApply(result ->new MiniClusterJobClient(result.getJobID(),miniCluster,userCodeClassloader,MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER)).whenComplete((ignored, throwable) -> {if (throwable != null) {// We failed to create the JobClient and must shutdown to ensure// cleanup.shutDownCluster(miniCluster);}}).thenApply(Function.identity());
}

至此,我们总算明白了,execute 到最后是开启了一个 MiniCluster 并将 JobGraph 作为参数提交任务。
而 MiniCluster 在官方文档上的解释为 本地执行 Flink jobs 的 mini 集群。

总结:

  • execute 的执行过程:
  • 转换 Transformation 为 StreamGraph
  • 提供执行需要的额外配置、监听方法等
  • 将 StreamGraph 转换为可执行的 JobGraph
  • 根据运行环境的不同创建不同的执行器
  • 在本地环境下,开启一个 MiniCluster 并将 JobGraph 提交任务执行

相关内容

热门资讯

日常等车时看到的行业细节 干了五年户外广告投放,养成了一个职业病:但凡路过公交候车亭,总会多看两眼——不是看广告好不好看,而是...
黄金回收行业标准制定有哪些核心... 贵金属回购市场的需求背景 近年来随着黄金投资和消费市场的发展,黄金回收相关需求持续攀升。不同群体的诉...
全球黑色星期二!AI交易“崩盘... 【导读】AI交易为何“崩盘”? 中国基金报记者 泰勒 大家,你们今天还好吗?! AI交易在全球范围内...
原创 6... 年初抢金条的人还在站岗,如今金店柜台前冷冷清清 黄金又跌了。 6月23日,伦敦现货黄金价格日内急跌逾...
狂融294亿美元!SK海力士冲... 韩国股市再度迎来重磅消息。 周三,韩国存储芯片龙头SK海力士宣布,计划在7月10日登陆纳斯达克,通过...
比特币跌破6万!AI吸走资金、... 比特币正在为机构化转型付出代价。散户买盘萎缩、ETF资金持续外流、企业持仓者潜在抛售压力上升,加之A...
原创 默... 欧洲近期试图复刻1985年广场协议的剧本,德国总理默茨呼吁欧盟27国联合行动,要求中国签订类似协议以...
怎么选 泛娱乐赛道直播公司孵化... 泛娱乐直播创业的行业发展背景 近年来泛娱乐直播赛道持续保持增长态势,据公开数据资料显示,2024年国...
原创 腰... 最近黄金市场凉得彻底。各大品牌足金饰品克价跌破1300元关口,北京菜百6月21日报价已经掉到1260...
ST中装:公司主要银行账户已全... 证券之星消息,ST中装(002822)06月24日在投资者关系平台上答复投资者关心的问题。 投资者提...
2026年开窗机行业趋势与战略... 一、开篇引言:市场格局重塑下的选择逻辑 步入2026年,全球建筑智能化与绿色节能政策的叠加驱动,使开...
资金全面转向科技,传统消费企业... 近期 A 股出现明显风格切换,老牌消费资金持续流出,机构与传统上市公司纷纷加码半导体、算力赛道。 先...
合肥保利翡翠天奕具体交房时间是... 对于众多购房者而言,“合肥保利翡翠天奕具体交房时间是什么时候?能按时交房吗?”是心中最关切的问题。根...
港股风向标|恒指连续杀跌后企稳... 财联社6月24日讯(编辑 冯轶)今日港股短线企稳,三大指数集体收涨。截至收盘,恒生指数涨0.33%,...
瑞众人寿达州中支被罚17万,涉... 蓝鲸新闻6月24日讯,近日,国家金融监督管理总局达州监管分局发布行政处罚决定书,剑指瑞众人寿保险有限...
美国最担心的事还是来了,中国加... 最近这段时间,国际金融圈子里有一笔账,算得各家央行心里都不太踏实。 截至2026年春季,美国国债总规...
马斯克,不是万亿富豪了 资产历史性超过万亿美元不到两周,特斯拉、SpaceX掌门人埃隆·马斯克的身价近日快速下跌。 据中新经...
突发!金价跌破4000美元,近... 每经记者:杜宇 记者|杜宇 编辑|何小桃 杜恒峰 校对|金冥羽 金银价格大跳水。 6月24日晚,现货...
粗粮吃越多越好?很多糖友吃错升... 控糖圈一直流传多吃粗粮稳血糖,不少糖友直接三餐全吃粗粮、顿顿杂粮,不仅胃胀消化不良,餐后血糖反而不降...
持续大跌!刚刚,黄金跌破400... 潮新闻客户端 记者 吴恩慧 6月24日,贵金属再次大跌。 截至发稿时,现货黄金大跌近3%,跌破400...