我是靠谱客的博主 瘦瘦星月,最近开发中收集的这篇文章主要介绍Flink使用lambda表达式报错:InvalidTypesException:could not be determined automatically, due to type erasure.,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在写Flink程序的时候(以最简单的WordCount案例为例),有时会使用Lambda表达式来简化,如下边程序中的flatMap算子和Map算子处,都是用了Lambda表达式来简写:

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataSource = env.socketTextStream("10.12.36.102", 8888);

        //flatMap,lambda形式简写
        SingleOutputStreamOperator<String> wordStream = dataSource.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect));

        //map,lambda形式简写
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordStream.map(word -> Tuple2.of(word, 1));

        //分组求和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumStream = wordAndOne.keyBy(tp -> tp.f0).sum(1);

        sumStream.print();
        env.execute();

    }

但是简化成Lambda形式后,出现了如下错误:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
The return type of function 'main(StreamingWordCount2.java:31)' could not be determined automatically, due to type erasure. 
You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.


Caused by: org.apache.flink.api.common.functions.InvalidTypesException: 
The generic type parameters of 'Tuple2' are missing. 
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.

其实在报出的错误中表明的很清楚了:

在许多情况下,当涉及Java泛型时,lambda方法不能为自动类型提取提供足够的信息。也就是说由于类型擦除,无法自动确定。

You can give type information hints by using the returns(...) method on the result of the transformation call,这句话就是告诉我们可以在转换的算子之后调用returns(...)方法来显示指明要返回的数据类型信息。

具体在代码中如下所示:

//flatMap(见本行最后)
SingleOutputStreamOperator<String> wordStream = dataSource.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING);

//map(见本行最后)
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordStream.map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

如代码所示,

在flatMap算子之后,调用returns方法来显示指定返回类型为Types.STRING类型;

在map算子之后,调用returns方法来显示指定返回类型为Types.TUPLE元组类型,并且元组中的第一个元素是STRING类型,第二个元素是INT类型。

这样就可以解决问题啦~


我是smallk,自学大数据,拿到百度、京东、小米、顺丰、58、哈罗、海康等22家大数据offer,欢迎仍在数据路上的小伙伴,我们一起讨论前行。


最后

以上就是瘦瘦星月为你收集整理的Flink使用lambda表达式报错:InvalidTypesException:could not be determined automatically, due to type erasure.的全部内容,希望文章能够帮你解决Flink使用lambda表达式报错:InvalidTypesException:could not be determined automatically, due to type erasure.所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部