我是靠谱客的博主 魁梧玉米,最近开发中收集的这篇文章主要介绍Flink 算子之map彻底阐述,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

先看一下flink中map算子的源码

/**
 * Applies a Map transformation on a {@link DataStream}. The transformation
 * calls a {@link MapFunction} for each element of the DataStream. Each
 * MapFunction call returns exactly one element. The user can also extend
 * {@link RichMapFunction} to gain access to other features provided by the
 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 *
 * @param mapper
 *            The MapFunction that is called for each element of the
 *            DataStream.
 * @param <R>
 *            output type
 * @return The transformed {@link DataStream}.
 */
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
         Utils.getCallLocationName(), true);

   return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

官方表述是对于一个DataStream进行映射操作,每次转换都调用MapFunction函数,每次调用返回一个元素

其实也就是从一个DataStream到另一个DataStream过程中,对每一个元素进行一些转换操作,返回的是SingleOutPutStreamOperator泛型,好多DataStream的方法都返回它,比如map,flapmap,filter,process等,最终还是调用的transform方法来实现的,以下是源码

@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

   // read the output type of the input Transform to coax out errors about MissingTypeInfo
   transformation.getOutputType();

   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());

   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

   getExecutionEnvironment().addOperator(resultTransform);

   return returnStream;
}

说那么多,现在开始上代码试试吧:

DataStream中map函数有两个重载的方法,即一个是传入lamada表达式,另一种是需要传入MapFunction函数

下面代码演示传入MapFunction函数,注意不要导错包,都需要导scala的包,不然报错

//获取环境变量
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //准备数据,类型DataStreamSource
    val dataStreamSource = env.fromElements("I Love You,I Love Chain")

    val mapper=new MapFunction[String,String] {
      override def map(value: String): String = {
        value+"aaaaa"
      }
    }
      dataStreamSource.map(mapper).print()
    env.execute("flink map operator")

下面代码使用Lamada的方式: 其实这种方式里面还是调用了上面第一种的方式,然后再掉java源码中的map方法

另外,只有在scala的源码中,map才会有重载方法,而在java的源码中,就只有一个传入MapFunction函数一种方法

 def main(args: Array[String]): Unit = {
    //获取环境变量
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //准备数据,类型DataStreamSource
    val dataStreamSource = env.fromElements("I Love You,I Love Chain")


      dataStreamSource.map(value=>value+"++++++").print()
    env.execute("flink map operator")

 

 

 

 

 

 

 

 

 

最后

以上就是魁梧玉米为你收集整理的Flink 算子之map彻底阐述的全部内容,希望文章能够帮你解决Flink 算子之map彻底阐述所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(30)

评论列表共有 0 条评论

立即
投稿
返回
顶部