概述
SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
/**
* 若想使用SparkSQL必须创建SQLContext 必须是传入SparkContext 不能是SparkConf
*/
SQLContext sqlContext = new SQLContext(sc);
/**
* 创建一个本地的集合 集合中元素的格式 json 类型String
*/
List<String> nameList = Arrays.asList(
"{'name':'zhangsan', 'age':55}",
"{'name':'lisi', 'age':30}",
"{'name':'lisisi', 'age':30}",
"{'name':'wangwu', 'age':19}");
List<String> scoreList = Arrays.asList(
"{'name':'zhangsan','score':100}",
"{'name':'lisi','score':99}" );
/**
* 并行化成一个rdd 现在rdd中元素格式 json格式
*/
JavaRDD<String> nameRDD = sc.parallelize(nameList);
JavaRDD<String> scoreRDD = sc.parallelize(scoreList);
DataFrame nameDF = sqlContext.read().json(nameRDD);
DataFrame scoreDF = sqlContext.read().json(scoreRDD);
/**
* SELECT nameTable.name,nameTable.age,scoreTable.score
* FROM nameTable JOIN nameTable ON (nameTable.name == scoreTable.name)
*/
nameDF.join(scoreDF, nameDF.col("name").$eq$eq$eq(scoreDF.col("name")))
.select(nameDF.col("name"),nameDF.col("age"),scoreDF.col("score")).show();
nameDF.registerTempTable("name");
scoreDF.registerTempTable("score");
String sql = "SELECT name.name,name.age,score.score "
+ "FROM name join score ON (name.name = score.name)";
sqlContext.sql(sql).show();
最后
以上就是美满红牛为你收集整理的11.2 DataFrame操作,集合转换的全部内容,希望文章能够帮你解决11.2 DataFrame操作,集合转换所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复