概述
将数据导入到mysql
object UrlGroupCount1 {
def main(args: Array[String]): Unit = {
//1.创建spark程序入口
val conf: SparkConf = new SparkConf().setAppName("UrlGroupCount1").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
sc.setCheckpointDir("hdfs:/")
//2.加载数据
val rdd1: RDD[String] = sc.textFile("c:/itstar.log")
//3.将数据切分
val rdd2: RDD[(String, Int)] = rdd1.map(line => {
val s: Array[String] = line.split("t")
//元祖输出
(s(1), 1)
})
//4.累加求和
val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_+_).cache()
//5.取出分组的学院
val rdd4: RDD[(String, Int)] = rdd3.map(x => {
val url = x._1
val host = new URL(url).getHost.split("[.]")(0)
//元祖输出
(host, x._2)
})
rdd4.checkpoint()
//6.根据学院分组
val rdd5: RDD[(String, List[(String, Int)])] = rdd4.groupBy(_._1).mapValues(it => {
//根据访问量排序 倒序
it.toList.sortBy(_._2).reverse.take(1)
})
//7.把计算结果保存到mysql中
rdd5.foreach(x => {
//把数据写到mysql
val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/urlcount?charatorEncoding=utf-8","root","root")
//把spark结果插入到mysql中
val sql = "INSERT INTO url_data (xueyuan,number_one) VALUES (?,?)"
//执行sql
val statement = conn.prepareStatement(sql)
statement.setString(1,x._1)
statement.setString(2,x._2.toString())
statement.executeUpdate()
statement.close()
conn.close()
})
//8.关闭资源 应用停掉
sc.stop()
}
}
jdbcRDD的方式导入Mysql
object JdbcRDDDemo {
def main(args: Array[String]): Unit = {
//1.加载数据
val conf: SparkConf = new SparkConf().setAppName("UrlCount").setMaster("local[2]")
//spark程序入口
val sc: SparkContext = new SparkContext(conf)
//匿名函数
val connection = () =>{
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/urlcount?charatorEncoding=utf-8","root","qq8911596")
}
//查询数据
val jdbcRdd: JdbcRDD[(Int, String, String)] = new JdbcRDD(
//指定sparkcontext
sc,
connection,
"SELECT * FROM pvc where uid >= ? AND uid <= ?",
//2个任务并行
1, 4, 2,
// >=1
<=4
2个任务并行
r => {
val uid = r.getInt(1)
val xueyuan = r.getString(2)
val number = r.getString(3)
(uid, xueyuan, number)
//设置输出格式,要哪个就写哪个
}
)
val jrdd = jdbcRdd.collect()
println(jrdd.toBuffer)
sc.stop()
}
}
JdbcRdd的参数:
1、getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
2、sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:“select title, author from books where ? < = id and id <= ?”
3、lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
4、mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。
最后
以上就是独特唇膏为你收集整理的Spark-jdbcRDD-mysql的全部内容,希望文章能够帮你解决Spark-jdbcRDD-mysql所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复