我是靠谱客的博主 乐观黑夜,最近开发中收集的这篇文章主要介绍SparkSQL创建RDD:<4>动态创建Schema将非json格式的RDD转换成DataFrame【Java,Scala纯代码】,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
Java版本
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("rddStruct");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
/**
* 转换成Row类型的RDD
*/
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(
String.valueOf(s.split(",")[0]),
String.valueOf(s.split(",")[1]),
Integer.valueOf(s.split(",")[2])
);
}
});
/**
* 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
*/
List<StructField> asList =Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
sc.stop();
Scala版代码:
val conf = new SparkConf()
conf.setMaster("local").setAppName("rddStruct")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt")
val rowRDD = lineRDD.map { x => {
val split = x.split(",")
RowFactory.create(split(0),split(1),Integer.valueOf(split(2)))
} }
val schema = StructType(List(
StructField("id",StringType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.printSchema()
sc.stop()
鼓励一下我呗,谢谢你。
最后
以上就是乐观黑夜为你收集整理的SparkSQL创建RDD:<4>动态创建Schema将非json格式的RDD转换成DataFrame【Java,Scala纯代码】的全部内容,希望文章能够帮你解决SparkSQL创建RDD:<4>动态创建Schema将非json格式的RDD转换成DataFrame【Java,Scala纯代码】所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复