我是靠谱客的博主 奋斗毛豆,最近开发中收集的这篇文章主要介绍Spark数据存储到mysql中,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

import org.apache.spark.SparkContext

import org.bson.{BSONObject, BasicBSONObject}
/**
 * Created by ha1 on 6/30/15.
 */

object DFMain {
  case class Person(name: String, age: Double, t:String)

  def main (args: Array[String]): Unit = {
    val sc = new SparkContext("local", "Scala Word Count")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val bsonRDD = sc.parallelize(("foo",1,"female")::("bar",2,"male")::("baz",-1,"female")::Nil).map(tuple=>{
      var bson = new BasicBSONObject()
      bson.put("name","bfoo")
      bson.put("value",0.1)
      bson.put("t","female")
      (null,bson)
    })
    val tDf = bsonRDD.map(_._2).map( f=> Person(f.get("name").toString, f.get("value").toString.toDouble, f.get("t").toString)).toDF()
    tDf.limit(1).show()
    val MYSQL_USRENAME = "root"
    val MYSQL_PWD = "root"
    val MYSQL_CONNECTION_URL = "jdbc:mysql://localhost/test?user="+MYSQL_USRENAME+"&password="+MYSQL_PWD
    tDf.createJDBCTable(MYSQL_CONNECTION_URL,"stest",true)


  }
}


需要引入的包,mysql-connector-java-xxx.jar,我选择的版本是5.1.15

spark-1.3.1

大家可能会觉得我的bsonRDD比较奇怪,其实不是,因为我需要将结果存入到mongodb和mysql, bsonRDD是存入mongodb的格式,大家重点关注tDf就可以了。

这个创建一个新的表的方式,另外一种是插入的方式

	
tDf.insertIntoJDBC(MYSQL_CONNECTION_URL, "users", false);

这里需要提醒大家一句,这个函数在spark-1.4.0里是不被提倡或者被抛弃的,已经改用函数save()了,详情可以参考官方doc文件,


另外需要提醒大家的是,这个与数据库的连接用完之后是没有被释放的,也就是说,如果你的程序一直调用mysqlDao插入数据,那么很快就会达到你mysql数据库设计的最大连接数而导致出错,最好的办法是使用scala写一个类似于java的mysqlDao。

scala连接mysql 博客教程


参考资料:

Save apache spark dataframe to database

Spark SQL and DataFrame Guide


最后

以上就是奋斗毛豆为你收集整理的Spark数据存储到mysql中的全部内容,希望文章能够帮你解决Spark数据存储到mysql中所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部