概述
本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:
- 获取一个执行环境(execution environment);
- 加载/创建初始数据;
- 指定数据相关的转换;
- 指定计算结果的存储位置;
- 触发程序执行。
StreamExecutionEnvironment 是所有 Flink 程序的基础。
创建
你可以使用 StreamExecutionEnvironment 的如下静态方法获取 StreamExecutionEnvironment:
getExecutionEnvironment();
createLocalEnvironment();
createRemoteEnvironment(String host, int port, String... jarFiles);
通常,你只需要使用 getExecutionEnvironment() 即可,因为该方法会根据上下文做正确的处理:如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行你的程序。
跟一下 getExecutionEnvironment() 方法:
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
// 首先检查当前上下文是否存在可用的 EnvironmentFactory
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
// 若当前上下文存在可用的 EnvironmentFactory,则基于该工厂类创建 ExecutionEnvironment
.map(factory -> factory.createExecutionEnvironment(configuration))
// 若工厂类未能创建 ExecutionEnvironment ,则调用 createLocalEnvironment(configuration) 方法创建 LocalStreamEnvironment
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
// 会判断是否有设置默认并行度
if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
// 若有设置,则基于配置中的并行度创建 LocalStreamEnvironment
return new LocalStreamEnvironment(configuration);
} else {
// 否则将基于 defaultLocalParallelism 创建 LocalStreamEnvironment
// 其中,defaultLocalParallelism 为程序运行节点的核数
Configuration copyOfConfiguration = new Configuration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
return new LocalStreamEnvironment(copyOfConfiguration);
}
}
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
数据接入
addSource()
第1种数据接入的方法是 addSource()。
该方法的入参为 SourceFunction 接口的实现类。
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
// 若未传入 Source 名称,则默认名称为 Custom Source
// 若应用存在多个 Source,建议手动指定各个 Source 的名称
return addSource(function, "Custom Source");
}
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
return addSource(function, sourceName, null);
}
public <OUT> DataStreamSource<OUT> addSource(
SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
// Boundedness 是标识数据源是否无界的枚举
// CONTINUOUS_UNBOUNDED 代表是连续无界的数据源
// BOUNDED 代表是有界的数据源
return addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);
}
private <OUT> DataStreamSource<OUT> addSource(
final SourceFunction<OUT> function,
final String sourceName,
@Nullable final TypeInformation<OUT> typeInfo,
final Boundedness boundedness) {
checkNotNull(function);
checkNotNull(sourceName);
checkNotNull(boundedness);
// 会根据你传入的 SourceFunction 解析出源数据类型
TypeInformation<OUT> resolvedTypeInfo =
getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
// 判断是否为 ParallelSourceFunction
boolean isParallel = function instanceof ParallelSourceFunction;
// 此处会检查传入的 SourceFunction 符合规范
clean(function);
// 没啥问题,就将该 SourceFunction 转化为 StreamSource
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(
this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}
createInput()
第2种数据接入的方法是 createInput()。
该方法的入参为 InputFormat 接口的实现类。
@PublicEvolving
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
// TypeExtractor.getInputFormatTypes(inputFormat) 提取 Source 的类型
return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
}
@PublicEvolving
public <OUT> DataStreamSource<OUT> createInput(
InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
DataStreamSource<OUT> source;
// 判断是否为文件读取的 InputFormat
if (inputFormat instanceof FileInputFormat) {
@SuppressWarnings("unchecked")
FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
source =
createFileInput(
format,
typeInfo,
"Custom File source",
FileProcessingMode.PROCESS_ONCE,
-1);
} else {
source = createInput(inputFormat, typeInfo, "Custom Source");
}
return source;
}
当检测到传入的 InputFormat 为 FileInputFormat 时,会走文件读取的方法,即 createFileInput。
createFileInput 的参数中有1个 FileProcessingMode:
@PublicEvolving
public enum FileProcessingMode {
// 处理当前目录下现存的文件内容(一次性读取)
PROCESS_ONCE,
// 周期性扫描目录下的文件以读取新数据(持续性读取)
PROCESS_CONTINUOUSLY
}
继续跟一下 createFileInput 方法:
private <OUT> DataStreamSource<OUT> createFileInput(
FileInputFormat<OUT> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
long interval) {
Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
Preconditions.checkArgument(
monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
|| interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
"The path monitoring interval cannot be less than "
+ ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL
+ " ms.");
// 创建1个周期性扫描目录文件内容的 Function
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ContinuousFileMonitoringFunction<>(
inputFormat, monitoringMode, getParallelism(), interval);
// 定义文件读取工厂类
ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory =
new ContinuousFileReaderOperatorFactory<>(inputFormat);
// 此处,会根据传入的 FileProcessingMode 来生成 Boundedness
// FileProcessingMode.PROCESS_ONCE 对应 Boundedness.BOUNDED 有界
// FileProcessingMode.PROCESS_CONTINUOUSLY 对应 Boundedness.CONTINUOUS_UNBOUNDED 无界
final Boundedness boundedness =
monitoringMode == FileProcessingMode.PROCESS_ONCE
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
// 最后调用的还是 addSource() 方法
// SingleOutputStreamOperator 是 DataStreamSource 的父类
SingleOutputStreamOperator<OUT> source =
addSource(monitoringFunction, sourceName, null, boundedness)
.transform("Split Reader: " + sourceName, typeInfo, factory);
return new DataStreamSource<>(source);
}
非文件的 InputFormat 的话,会走另外1个 createInput 方法:
private <OUT> DataStreamSource<OUT> createInput(
InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) {
// 将传入的 InputFormat 转化为 InputFormatSourceFunction
InputFormatSourceFunction<OUT> function =
new InputFormatSourceFunction<>(inputFormat, typeInfo);
// 最后调用的还是 addSource() 方法
return addSource(function, sourceName, typeInfo);
}
InputFormatSourceFunction 为 SourceFunction 接口的实现类,其继承链路如下:
SourceFunction-->ParallelSourceFunction-->RichParallelSourceFunction-->InputFormatSourceFunction
所以,createInput() 本质上调用的还是 addSource() 方法。
fromSource()
第3种数据接入的方法是 fromSource()。
该方法的入参为 Source 接口的实现类。
Flink 1.12 及以后,社区基于 FLIP-27 的改进计划,实现了1种新的 Source 架构。
如果你对 Flink 新的 Source 架构不熟悉的话,可以参阅我之前写好的1篇博客:
Flink进阶系列–FLIP-27新的Source架构
如果你想基于新的 Source 接入数据,则需要通过 StreamExecutionEnvironment.fromSource()。
// 第1个参数为 Source 接口的实现类
// 第2个参数为水印生成策略
// 第3个参数为 Source 名称
@PublicEvolving
public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName) {
return fromSource(source, timestampsAndWatermarks, sourceName, null);
}
@Experimental
public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
TypeInformation<OUT> typeInfo) {
// 基于传入的 Source 提取出数据源的类型
final TypeInformation<OUT> resolvedTypeInfo =
getTypeInfo(source, sourceName, Source.class, typeInfo);
return new DataStreamSource<>(
this,
checkNotNull(source, "source"),
checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
checkNotNull(resolvedTypeInfo),
checkNotNull(sourceName));
}
任务执行
一旦指定了完整的程序,需要调用 StreamExecutionEnvironment 的 execute() 方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在你的本地机器上触发,或将你的程序提交到某个集群上执行。
execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。
如果不想等待作业完成,可以通过调用 StreamExecutionEnvironment 的 executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,你可以通过它与刚刚提交的作业进行通信。如下是使用 executeAsync() 实现 execute() 语义的示例。
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
关于程序执行的最后一部分对于理解何时以及如何执行 Flink 算子是至关重要的。所有 Flink 程序都是延迟执行的:当程序的 main 方法被执行时,数据加载和转换不会直接发生。相反,每个算子都被创建并添加到 dataflow 形成的有向图。当执行被执行环境的 execute() 方法显示地触发时,这些算子才会真正执行。程序是在本地执行还是在集群上执行取决于执行环境的类型。
紧接着看一下源码:
public JobExecutionResult execute() throws Exception {
// getStreamGraph() 方法用于获取任务的 StreamGraph
return execute(getStreamGraph());
}
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}
// We copy the transformation so that newly added transformations cannot intervene with the
// stream graph generation.
return new StreamGraphGenerator(
new ArrayList<>(transformations), config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
}
可以看出,execute() 提交的是任务的 StreamGraph,DataStreamEnvironment 持有整个任务的 transformations 列表,然后再基于 StreamGraphGenerator 类将 transformations 列表转化为 StreamGraph。
StreamGraphGenerator 类源码后面我们专门出1篇博客详细讲解,此处暂时不做讲解。
execute() 本质上调用的是 execute(StreamGraph streamGraph)。
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// 最终还是调用的 executeAsync() 方法
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
// 如果部署配置为 DeploymentOptions.ATTACHED 连接模式
// 此时,客户端需要等待任务执行完毕,然后调用 getJobExecutionResult().get() 方法获取执行结果
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
// jobClient.getJobExecutionResult() 返回的是1个 CompletableFuture<JobExecutionResult>
jobExecutionResult = jobClient.getJobExecutionResult().get();
// 若部署模式为非连接模式,则客户端无需关注执行结果,异步提交任务之后,直接构造1个 DetachedJobExecutionResult 返回即可
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
// 获取到任务执行结果之后,依次执行各个任务监听器的 onJobExecuted() 方法
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
execute() 最终还是通过 executeAsync() 方法异步提交的任务。
@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
// 通过 PipelineExecutorFactory 提交 StreamGraph
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
// 获取到任务提交结果之后,依次执行各个任务监听器的 onJobSubmitted() 方法
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
整个任务 execute() 过程包含2大步,而且均是异步的。
- 第1步是异步提交任务,提交之后直接返回 CompletableFuture,然后通过 CompletableFuture 的 get() 方法拿到提交结果后,执行各个任务监听器的 onJobSubmitted() 方法;
- 第2步是异步执行任务,调用之后直接返回 CompletableFuture,然后通过 CompletableFuture 的 get() 方法拿到任务执行结果后,执行各个任务监听器的 onJobExecuted() 方法。
任务监听器 JobListener:
@PublicEvolving
public interface JobListener {
// 当任务被提交后调用
void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable);
// 当任务被执行完后调用
void onJobExecuted(
@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable);
}
注册方法为:
@PublicEvolving
public void registerJobListener(JobListener jobListener) {
checkNotNull(jobListener, "JobListener cannot be null");
jobListeners.add(jobListener);
}
所以,如果你需要在任务提交之后及任务执行完之后执行某些操作(如日志埋点、回调接口等),可以通过注册 JobListener 的方式轻松实现。
本文到此结束,感谢阅读!
最后
以上就是奋斗奇异果为你收集整理的Flink源码解析系列--StreamExecutionEnvironment类的全部内容,希望文章能够帮你解决Flink源码解析系列--StreamExecutionEnvironment类所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复