深入浅出 | flink minicluster 启动流程

目录

  1. 目录

  2. flink 怎么判断是本地环境还是集群环境的?

  3. flink 的接口都是声明式的,那么到底把我们的接口解析或者封装成什么样进行执行了?

flink 怎么判断是本地环境还是集群环境的?

环境判断

1

通过 debug StreamExecutionEnvironment.getExecutionEnvironment(); 可以定位到下面的代码

8

我们可以看注释,解释一下,基本流程如下:

  1. 先尝试获取本地环境,即从 threadLocalContextEnvironmentFactory 获取环境配置
  2. 获取不到本地环境配置,就获取外部环境配置,即从 contextEnvironmentFactory 获取环境配置
1
2
3
4
5
6
7
8
9
10
11
12
13
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

public static <T> Optional<T> resolveFactory(
ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) {
final T localFactory = threadLocalFactory.get();
final T factory = localFactory == null ? staticFactory : localFactory;

return Optional.ofNullable(factory);
}

那么问题来了,当集群环境时,是如何将 StreamExecutionEnvironmentFactory 到 ThreadLocal 中?

集群环境

通过 bin/flink run ….命令提交jar包到集群运行命令时,该脚本会调用org.apache.flink.client.cli.CliFrontend 来运行用户程序,如下:

1
2
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

在CliFrontend中依次执行以下方法 main() ->parseParameters() -> run() ->executeProgram()

1
2
3
4
5
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}

在org.apache.flink.client.ClientUtils的executeProgram()中调用StreamContextEnvironment.setAsContext(…),StreamContextEnvironment继承自StreamExecutionEnvironment。setAsContext()代码如下

1
2
3
4
5
6
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);

创建生成运行环境的工厂类实例,在initializeContextEnvironment()方法中把实例放到StreamExecutionEnvironment类的静态属性threadLocalContextEnvironmentFactory 中,代码如下

1
2
3
4
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
}

这样在用户程序StreamExecutionEnvironment.getExecutionEnvironment()时,获取到的运行环境就是StreamContextEnvironment类的setAsContext()方法中生成的
本地运行环境LocalStreamEnvironment和 独立集群、flink on yarn等运行环境StreamContextEnvironment 的主要区别在于,他们的成员属性 configuration 不同。LocalStreamEnvironment 中是创建的空键值对(new Configuration()),而StreamContextEnvironment 是通过CliFrontend 生成的Configuration对象。

flink 的接口都是声明式的,那么到底把我们的接口解析或者封装成什么样进行执行了?

声明式编程

https://www.cnblogs.com/sirkevin/p/8283110.html

9

从上述整体的架构可以看出来,flink 的架构是区分客户端和服务端的,我们在客户端编写号代码,然后提交代码到服务端进行运行
而且 flink 的接口是声明式的,那么这个过程中肯定会涉及到代码的包装执行。具体代码进行了哪些包装,如下图 4 个图的流程就是 flink 从客户端提交代码到服务端具体执行的整个解析过程:

10

1. StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。

  • StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
  • StreamEdge:表示连接两个 StreamNode 的边。
  • 是在客户端生成的,在 flink 中是一个具体的数据结构。

11

2. JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。

  • JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
  • IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
  • JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。
  • 是在客户端生成的,在 flink 中是一个具体的数据结构。

12

3. ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。

  • ExecutionJobVertex:和 JobGraph 中的 JobVertex 一一对应。每一个 ExecutionJobVertex 都有和并发度一样多的 ExecutionVertex。
  • ExecutionVertex:表示 ExecutionJobVertex 的其中一个并发子任务,输入是 ExecutionEdge,输出是 IntermediateResultPartition。
  • IntermediateResult:和 JobGraph 中的 IntermediateDataSet 一一对应。一个 IntermediateResult 包含多个 IntermediateResultPartition,其个数等于该 operator 的并发度。
  • IntermediateResultPartition:表示 ExecutionVertex 的一个输出分区,producer 是 ExecutionVertex,consumer 是若干个 ExecutionEdge。
  • ExecutionEdge:表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target 是 ExecutionVertex。source 和 target 都只能是一个。
  • Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM 和 TM 之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
  • 是在服务端生成的,在 flink 中是一个具体的数据结构。

13

4. 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

  • Task:Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
  • ResultPartition:代表由一个Task的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。
  • ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。
  • InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。
  • InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。
  • 是在服务端生成的,不是具体的数据结构,而是实际的物理执行的一个图描述。

那么 Flink 为什么要设计这4张图呢,其目的是什么呢?Spark 中也有多张图,数据依赖图以及物理执行的 DAG。其目的都是一样的,就是解耦,每张图各司其职,每张图对应了 Job 不同的阶段,更方便做该阶段的事情。我们给出更完整的 Flink Graph 的层次图。
14

首先我们看到,JobGraph 之上除了 StreamGraph 还有 OptimizedPlan。OptimizedPlan 是由 Batch API 转换而来的。StreamGraph 是由 Stream API 转换而来的。为什么 API 不直接转换成 JobGraph?因为,Batch 和 Stream 的图结构和优化方法有很大的区别,比如 Batch 有很多执行前的预分析用来优化图的执行,而这种优化并不普适于 Stream,所以通过 OptimizedPlan 来做 Batch 的优化会更方便和清晰,也不会影响 Stream。JobGraph 的责任就是统一 Batch 和 Stream 的图,用来描述清楚一个拓扑图的结构,并且做了 chaining 的优化,chaining 是普适于 Batch 和 Stream 的,所以在这一层做掉。ExecutionGraph 的责任是方便调度和各个 tasks 状态的监控和跟踪,所以 ExecutionGraph 是并行化的 JobGraph。而“物理执行图”就是最终分布式在各个机器上运行着的tasks了。所以可以看到,这种解耦方式极大地方便了我们在各个层所做的工作,各个层之间是相互隔离的。

如何生成 StreamGraph?

基于 1.12 源码。

StreamGraph 是在客户端构造的,这意味着我们可以在本地通过调试观察 StreamGraph 的构造过程。
StreamGraph 生成的整个入口函数执行顺序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 开始执行代码
public JobExecutionResult execute() throws Exception {
return execute(getJobName());
}

public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

return execute(getStreamGraph(jobName));
}

// 生成 StreamGraph
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
...
return streamGraph;
}

// 获取 StreamGraphGenerator
private StreamGraphGenerator getStreamGraphGenerator() {
...

return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}

// 初始化 StreamGraphGenerator
public StreamGraphGenerator(
List<Transformation<?>> transformations,
ExecutionConfig executionConfig,
CheckpointConfig checkpointConfig,
ReadableConfig configuration) {
this.transformations = checkNotNull(transformations);
this.executionConfig = checkNotNull(executionConfig);
this.checkpointConfig = new CheckpointConfig(checkpointConfig);
this.configuration = checkNotNull(configuration);
}

// 客户端开始执行
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);
...
}

可以看到初始化 StreamGraphGenerator 需要传入四个参数,其中我们最需要关心的参数就是第一个 transformations;

Transformation 是用来干啥的?

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/#datastream-transformations

Transformation 其实就是我们在代码中定义的各种转换操作,比如 map,filter,flatmap,keyby 等转换操作,经过 flink 的简单包装之后成为 Transformation,其之间的具体关系如下图:
16

经过这么多层的封装,肯定是为用户自定义算子增强或者填充了一些功能,具体是哪些功能呢?

举个例子:

1
2
3
4
5
6
7
8
9
10
11
environment
.fromElements(Tuple2.of(1, 1), Tuple2.of(2, 2), Tuple2.of(1, 3), Tuple2.of(2, 4), Tuple2.of(3, 10))
.map(t -> t)
.keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer getKey(Tuple2<Integer, Integer> i) throws Exception {
return i.f0;
}
})
.flatMap(new CountWindowAverage())
.print();

先 debug 到 map(t -> t),然后我们来看 env 中的变量:
17

从这个变量来看就可以和上面的图映射上了,分别是 OneInputTransformation -> StreamMap -> UserFunction
我们仔细看这些变量,来查看到底哪些增强了哪些功能。

1. StreamMap
20

  • ChainingStrategy:flink 的优化策略中有本地性优化,就是会检查前一个算子和后一个算子是否能够 chain 在一起,这样的话就可以把数据的 shuffle 本地化,而 ChainingStrategy 就是这个优化的一个决定条件;

2. OneInputTransformation
19

  • input:负责将上游 Transformation 进行存储,之后可以利用这个上下游关系串联整个 DAG;
  • stateKeySelector:如果是 keyBy 后接 map,则 map 算子就会作用在具体 key 的数据上,这个就是对应的 KeySelector;
  • stateKeyType:同 stateKeySelector,存储的是具体的 key 的类型信息;
  • outputType:输出数据的类型信息;
  • typeUsed:TODO;
  • parallelism:算子并发度信息;
  • maxParallelism:算子最大并发度信息;
  • minResources,preferredResources:用户自定义细粒度的资源配置(这个 feature 还没有开放给用户);
  • managedMemoryWeight:TODO
  • uid:状态恢复时的状态唯一标识,恢复时会根据状态中的 uid 和算子中的 uid 进行匹配,匹配不到状态就不能恢复;
  • userProvidedNodeHash:TODO
  • bufferTimeout:TODO
  • slotSharingGroup:可以进行 slot 分享的 group 标识;
  • coLocationGroupKey:TODO

注意不同的 Transformation 做了不同的增强,常见的 Transformation 有以下几类,大家可以自己进行 debug 查看都有哪些增强。
15

另外,并不是每一个 Transformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。

21
union、split/select、partition 中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现,UnionTransformation,SplitTransformation,SelectTransformation,PartitionTransformation 由于不包含具体的操作所以都没有 StreamOperator 成员变量,而其他 Transformation 的子类基本上都有。

Transformation 的作用:

  • 1:用户在编写时定义的上下游算子。即串联用户定义的原始 DAG 上下游信息。
  • 2:用户在编写时定义的输入输出类型。即尝试获取算子输入输出类型信息以及对应的序列化器。
  • 3:用户在编写时定义的算子并行度。即计算出每一个算子的并行度信息。

StreamGraph 是用来干啥的?

先来看看 StreamGraph 实例

26

Transformation -> StreamGraph

我们来看看 StreamGraph 构建完成之后最主要的内容包含哪些?

28

DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。下图所示为 StreamOperator 的类图(点击查看大图):

22

23

可以发现,所有实现类都继承了AbstractStreamOperator。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator,该类是封装了UDF的StreamOperator。UDF就是实现了Function接口的类,如MapFunction,FilterFunction。

我们通过在 DataStream 上做了一系列的转换(map、filter等)得到了 Transformation 集合,然后通过 StreamGraphGenerator.generate 获得 StreamGraph,该方法的源码如下:

24

StreamGraphGenerator 中其实包含的最主要的内容就是各个类型 Transformation 的 Translator。如图所示;

25

每个 Translator 会将 Transformation 转换为对应的 StreamNode 或者 StreamEdge。

如何生成 JobGraph?

我们来看看 JobGraph 构建完成之后最主要的内容包含哪些?

29

JobGraph 的相关数据结构主要在 org.apache.flink.runtime.jobgraph 包中。构造 JobGraph 的代码主要集中在 StreamingJobGraphGenerator 类中,入口函数是 StreamingJobGraphGenerator.createJobGraph()。我们首先来看下 StreamingJobGraphGenerator 的核心源码:

StreamNode 转成 JobVertex,StreamEdge 转成 JobEdge,JobEdge 和 JobVertex 之间创建 IntermediateDataSet 来连接。关键点在于将多个 SteamNode chain 成一个 JobVertex的过程,这部分源码比较绕,有兴趣的同学可以结合源码单步调试分析。下一章将会介绍 JobGraph 提交到 JobManager 后是如何转换成分布式化的 ExecutionGraph 的。

每个 JobVertex 都会对应一个可序列化的 StreamConfig, 用来发送给 JobManager 和 TaskManager。最后在 TaskManager 中起 Task 时,需要从这里面反序列化出所需要的配置信息, 其中就包括了含有用户代码的StreamOperator。

setChaining会对source调用createChain方法,该方法会递归调用下游节点,从而构建出node chains。createChain会分析当前节点的出边,根据Operator Chains中的chainable条件,将出边分成chainalbe和noChainable两类,并分别递归调用自身方法。之后会将StreamNode中的配置信息序列化到StreamConfig中。如果当前不是chain中的子节点,则会构建 JobVertex 和 JobEdge相连。如果是chain中的子节点,则会将StreamConfig添加到该chain的config集合中。一个node chains,除了 headOfChain node会生成对应的 JobVertex,其余的nodes都是以序列化的形式写入到StreamConfig中,并保存到headOfChain的 CHAINED_TASK_CONFIG 配置项中。直到部署时,才会取出并生成对应的ChainOperators,具体过程请见理解 Operator Chains。

如何生成 StreamGraph?

参考文章

  1. http://wuchong.me/blog/2016/05/03/flink-internals-overview/
  2. http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph/
  3. https://outofmemory.cn/article-201939.html
  4. http://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/