概述
先看一下flink中map算子的源码
/**
* Applies a Map transformation on a {@link DataStream}. The transformation
* calls a {@link MapFunction} for each element of the DataStream. Each
* MapFunction call returns exactly one element. The user can also extend
* {@link RichMapFunction} to gain access to other features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param <R>
* output type
* @return The transformed {@link DataStream}.
*/
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
官方表述是对于一个DataStream进行映射操作,每次转换都调用MapFunction函数,每次调用返回一个元素
其实也就是从一个DataStream到另一个DataStream过程中,对每一个元素进行一些转换操作,返回的是SingleOutPutStreamOperator泛型,好多DataStream的方法都返回它,比如map,flapmap,filter,process等,最终还是调用的transform方法来实现的,以下是源码
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
说那么多,现在开始上代码试试吧:
DataStream中map函数有两个重载的方法,即一个是传入lamada表达式,另一种是需要传入MapFunction函数
下面代码演示传入MapFunction函数,注意不要导错包,都需要导scala的包,不然报错
//获取环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
//准备数据,类型DataStreamSource
val dataStreamSource = env.fromElements("I Love You,I Love Chain")
val mapper=new MapFunction[String,String] {
override def map(value: String): String = {
value+"aaaaa"
}
}
dataStreamSource.map(mapper).print()
env.execute("flink map operator")
下面代码使用Lamada的方式: 其实这种方式里面还是调用了上面第一种的方式,然后再掉java源码中的map方法
另外,只有在scala的源码中,map才会有重载方法,而在java的源码中,就只有一个传入MapFunction函数一种方法
def main(args: Array[String]): Unit = {
//获取环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
//准备数据,类型DataStreamSource
val dataStreamSource = env.fromElements("I Love You,I Love Chain")
dataStreamSource.map(value=>value+"++++++").print()
env.execute("flink map operator")
最后
以上就是魁梧玉米为你收集整理的Flink 算子之map彻底阐述的全部内容,希望文章能够帮你解决Flink 算子之map彻底阐述所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复