概述
一、目的
RDD转换为DataFrame实现文本文件数据源读取
二、题目要求
本关任务:本关主题是通过读取外部数据源文本文件生成DataFrame,并利用DataFrame对象的常用Transformation操作和Action操作实现功能。已知学生信息(student)、教师信息(teacher)、**课程信息(course)****和成绩信息(score)**如下图所示,通过Spark SQL对这些信息进行查询,分别得到需要的结果。
学生信息student.txt如下所示。
108,ZhangSan,male,1995/9/1,95033
105,KangWeiWei,female,1996/6/1,95031
107,GuiGui,male,1992/5/5,95033
101,WangFeng,male,1993/8/8,95031
106,LiuBing,female,1996/5/20,95033
109,DuBingYan,male,1995/5/21,95031
教师信息teacher.txt如下所示。
825,LinYu,male,1958,Associate professor,department of computer
804,DuMei,female,1962,Assistant professor,computer science department
888,RenLi,male,1972,Lecturer,department of electronic engneering
852,GongMOMO,female,1986,Associate professor,computer science department
864,DuanMu,male,1985,Assistant professor,department of computer
课程信息course.txt如下所示。
3-105,Introduction to computer,825
3-245,The operating system,804
6-101,Spark SQL,888
6-102,Spark,852
9-106,Scala,864
成绩信息score.txt如下所示。
108,3-105,99
105,3-105,88
107,3-105,77
三、代码
package org.apache.spark
import org.apache.spark.sql.{Row, SparkSession,DataFrame}
import org.apache.spark.sql.types._
import scala.collection.mutable
import java.text.SimpleDateFormat
object test {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("test")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
/** ************************ student表结构******************/
import spark.implicits._
val studentRDD = spark.sparkContext.textFile(
"C:\Users\11359\IdeaProjects\untitled7\educodersqldf\student.txt")
//创建表结构(学号,学生姓名,学生性别,学生出生年月,学生所在班级)
val schemaString = "Sno Sname Ssex Sbirthday SClass"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = false))
val schema = StructType(fields)
val rowRDD = studentRDD.map(_.split(","))
.map(elements => Row(elements(0),elements(1).trim,elements(2),elements(3),elements(4)))
//转换为DataFrame
val stuDF = spark.createDataFrame(rowRDD,schema)
//生成临时表
stuDF.createTempView("students")
/** ************************ teacher表结构************************/
val teacherRDD = spark.sparkContext.textFile("C:\Users\11359\IdeaProjects\untitled7\educodersqldf\teacher.txt")
//创建表结构(教工编号(主键),教工姓名,教工性别,教工出生年份,职称,教工所在部门)
val schemaString1 = "Tno Tname Tsex Tyear Prof Depart"
val fields1 = schemaString1.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = false))
val schema1 = StructType(fields1)
val rowRDD1 = teacherRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2),elements(3),elements(4),elements(5)))
//转换为DataFrame
val teaDF = spark.createDataFrame(rowRDD1,schema1)
//生成临时表
teaDF.createTempView("teachers")
/** ************************ course表结构*****************************/
val courseRDD = spark.sparkContext.textFile("C:\Users\11359\IdeaProjects\untitled7\educodersqldf\course.txt")
//创建表结构(课程号,课程名称,教工编号)
val schemaString2 = "Cno Cname Tno"
val fields2 = schemaString2.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = false))
val schema2 = StructType(fields2)
val rowRDD2 = courseRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2)))
//转换为DataFrame
val clasDF = spark.createDataFrame(rowRDD2,schema2)
//生成临时表
clasDF.createTempView("classes")
/** ************************ score表结构*****************************/
val scoreRDD = spark.sparkContext.textFile("C:\Users\11359\IdeaProjects\untitled7\educodersqldf\score.txt")
//创建表结构(学号(外键),课程号(外键),成绩)
val Scoreschema:StructType=StructType(mutable.ArraySeq(
StructField("Sno",StringType,false),
StructField("Cno",StringType,false),
StructField("Degree",IntegerType,true)
))
val rowRDD3=scoreRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2).toInt))
//转换为DataFrame
val scoreDF = spark.createDataFrame(rowRDD3,Scoreschema)
//生成临时表
scoreDF.createTempView("score")
/** ************************对各表的处理*****************************/
//按照班级排序显示所有学生信息
spark.sql("SELECT * from students order by SClass").show()
//查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
spark.sql("SELECT Tname AS tname,Prof AS prof " +
"FROM teachers " +
"WHERE prof NOT IN (SELECT a.prof " +
"FROM (select Prof " +
"from teachers " +
"where Depart='department of computer'" +
")a " +
"join(select Prof " +
"from teachers " +
"where Depart='department of electronic engneering'" +
")b on a.Prof=b.Prof) order by Tname").show()
//显示性别为nv的教师信息
spark.sql("SELECT * from teachers where Tsex='female'").show()
//显示不重复的教师部门信息
spark.sql("SELECT DISTINCT Depart from teachers").show()
//显示最高成绩
spark.sql("SELECT max(Degree) from score").show()
//按照班级排序显示每个班级的平均成绩
spark.sql("SELECT Cno,avg(Degree) from score group by Cno order by Cno").show()
}
}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import scala.collection.mutable
import java.text.SimpleDateFormat
object sparkSQL01{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AvgScore").setMaster("local")
val spark = SparkSession
.builder()
.master("local")
.appName("sparkSQL01")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
/** ************************ student表结构*****************************/
import spark.implicits._
val studentRDD = spark.sparkContext.textFile("data/student.txt")
//创建表结构(学号,学生姓名,学生性别,学生出生年月,学生所在班级)
val schemaString = "Sno Sname Ssex Sbirthday SClass"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = false))
val schema = StructType(fields)
val rowRDD = studentRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2),elements(3),elements(4)))
//转换为DataFrame
val stuDF = spark.createDataFrame(rowRDD,schema)
//生成临时表
stuDF.createOrRepalceTempView("students")
/** ************************ teacher表结构*****************************/
val teacherRDD = spark.sparkContext.textFile("data//teacher.txt")
//创建表结构(教工编号(主键),教工姓名,教工性别,教工出生年份,职称,教工所在部门)
val schemaString1 = "Tno Tname Tsex Tyear Prof Depart"
val fields1 = schemaString1.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = false))
val schema1 = StructType(fields1)
val rowRDD1 = teacherRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2),elements(3),elements(4),elements(5)))
//转换为DataFrame
val teaDF = spark.createDataFrame(rowRDD1,schema1)
//生成临时表
teaDF.createOrRepalceTempView("teachers")
/** ************************ course表结构*****************************/
val courseRDD = spark.sparkContext.textFile("data//course.txt")
//创建表结构(课程号,课程名称,教工编号)
val schemaString2 = "Cno Cname Tno"
val fields2 = schemaString2.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = false))
val schema2 = StructType(fields2)
val rowRDD2 = classRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2)))
//转换为DataFrame
val clasDF = spark.createDataFrame(rowRDD2,schema2)
//生成临时表
clasDF.createOrRepalceTempView("classes")
/** ************************ score表结构*****************************/
val scoreRDD = spark.sparkContext.textFile("data//score.txt")
//创建表结构(学号(外键),课程号(外键),成绩)
val Scoreschema:StructType=StructType(mutable.ArraySeq(
StructField("Sno",StringType,false),
StructField("Cno",StringType,false),
StructField("Degree",IntegerType,true)
))
val rowRDD3=scoreRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2).toInt))
//转换为DataFrame
val scoreDF = spark.createDataFrame(rowRDD3,Scoreschema)
//生成临时表
scoreDF.createTempView("score")
/** ************************对各表的处理*****************************/
//按照班级排序显示所有学生信息
spark.sql("SELECT * from students order by SClass").show()
//查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
spark.sql("SELECT Tname AS tname,Prof AS prof " +
"FROM teachers " +
"WHERE prof NOT IN (SELECT a.prof " +
"FROM (select Prof " +
"from teachers " +
"where Depart='department of computer'" +
")a " +
"join(select Prof " +
"from teachers " +
"where Depart='department of electronic engneering'" +
")b on a.Prof=b.Prof) order by Tname").show()
//显示性别为nv的教师信息
spark.sql("SELECT * from teachers where Tsex='female'").show()
//显示不重复的教师部门信息
spark.sql("SELECT DISTINCT Depart from teachers").show()
//显示最高成绩
spark.sql("SELECT MAX(Degree) from score").show()
//按照班级排序显示每个班级的平均成绩
spark.sql("SELECT Cno,avg(Degree) from score group by Cno order by Cno").show()
}
}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import scala.collection.mutable
import java.text.SimpleDateFormat
object sparkSQL01 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("test")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
/** ************************ student表结构*****************************/
val studentRDD = spark.sparkContext.textFile("data/student.txt")
val StudentSchema: StructType = StructType(mutable.ArraySeq( //学生表
StructField("Sno", StringType, nullable = false), //学号
StructField("Sname", StringType, nullable = false), //学生姓名
StructField("Ssex", StringType, nullable = false), //学生性别
StructField("Sbirthday", StringType, nullable = true), //学生出生年月
StructField("SClass", StringType, nullable = true) //学生所在班级
))
val studentData = studentRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
val studentDF = spark.createDataFrame(studentData,StudentSchema)
studentDF.createOrReplaceTempView("student")
/** ************************ teacher表结构*****************************/
val teacherRDD = spark.sparkContext.textFile("data/teacher.txt")
val TeacherSchema: StructType = StructType(mutable.ArraySeq( //教师表
StructField("Tno", StringType, nullable = false), //教工编号(主键)
StructField("Tname", StringType, nullable = false), //教工姓名
StructField("Tsex", StringType, nullable = false), //教工性别
StructField("Tyear", IntegerType, nullable = true), //教工出生年月
StructField("Prof", StringType, nullable = true), //职称
StructField("Depart", StringType, nullable = false) //教工所在部门
))
val teacherData = teacherRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2),attributes(3).toInt,attributes(4),attributes(5)))
val teacherDF = spark.createDataFrame(teacherData,TeacherSchema)
teacherDF.createOrReplaceTempView("teacher")
/** ************************ course表结构*****************************/
val courseRDD = spark.sparkContext.textFile("data/course.txt")
val CourseSchema: StructType = StructType(mutable.ArraySeq( //课程表
StructField("Cno", StringType, nullable = false), //课程号
StructField("Cname", StringType, nullable = false), //课程名称
StructField("Tno", StringType, nullable = false) //教工编号
))
val courseData = courseRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2)))
val courseDF = spark.createDataFrame(courseData,CourseSchema)
courseDF.createOrReplaceTempView("course")
/** ************************ score表结构*****************************/
val scoreRDD = spark.sparkContext.textFile("data/score.txt")
val ScoreSchema: StructType = StructType(mutable.ArraySeq( //成绩表
StructField("Sno", StringType, nullable = false), //学号(外键)
StructField("Cno", StringType, nullable = false), //课程号(外键)
StructField("Degree", IntegerType, nullable = true) //成绩
))
val scoreData = scoreRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2).toInt))
val scoreDF = spark.createDataFrame(scoreData,ScoreSchema)
scoreDF.createOrReplaceTempView("score")
/** ************************对各表的处理*****************************/
//按照班级排序显示所有学生信息
spark.sql("SELECT * FROM student ORDER BY Sno").show()
// 查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
spark.sql("SELECT tname, prof " +
"FROM Teacher " +
"WHERE prof NOT IN (SELECT a.prof " +
"FROM (SELECT prof " +
"FROM Teacher " +
"WHERE depart = 'department of computer' " +
") a " +
"JOIN (SELECT prof " +
"FROM Teacher " +
"WHERE depart = 'department of electronic engineering' " +
") b ON a.prof = b.prof) ").orderBy("tname").show(false)
//显示性别为nv的教师信息
teacherDF.filter("Tsex = 'female'").show(false)
//显示不重复的教师部门信息
teacherDF.select("Depart").distinct().show(false)
val maxsc = scoreDF.agg("Degree"->"max").show()
val meansc = scoreDF.groupBy("Cno").agg("Degree"->"mean").orderBy("Cno").show()
// meansc.write.format("json").save("mean.json")
}
}
最后
以上就是生动鲜花为你收集整理的Spark SQL的全部内容,希望文章能够帮你解决Spark SQL所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复