概述
Spark连接Mysql与hive
连接Mysql
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
获取SparkSession对象
package com.qf.sql.day03
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* 演示SparkSql如何加载文件
*/
object _03TestMysql {
def main(args: Array[String]): Unit = {
write
read
}
def write: Unit = {
val spark = SparkSession.builder()
.appName("testload")
.master("local[*]")
.getOrCreate()
import spark.implicits._
/*
* 写入mysql中
* jdbc(url: String, table: String, properties: Properties)
*
* url:用于指定连接路径
* table: 指定表名
* properties:用于指定连接参数的其他属性,比如,用户名,密码等
*/
val df: DataFrame = spark.read
.csv("sql/country.csv")
.toDF("id","country","code")
val prop = new Properties()
prop.put("user","root")
prop.put("password","123456")
prop.put("driver","com.mysql.cj.jdbc.Driver")
/**
* 写出时的模式研究:
* SaveMode.Append 追加 ,注意主键的问题,如果有主键,可能会报错
* SaveMode.ErrorIfExists 如果存在就提示报错, 是默认模式
* SaveMode.Ignore 忽略 , 如果已经存在,则不插入
* SaveMode.Overwrite 覆盖, 如果已经存在,则删除,重新创建
*/
df.write.mode(SaveMode.Ignore)
.jdbc("jdbc:mysql://10.36.140.103:3306/mydb2?serverTimezone=UTC","country",prop)
spark.stop()
}
def read: Unit = {
val spark = SparkSession.builder()
.appName("testload")
.master("local[*]")
.getOrCreate()
import spark.implicits._
/*
* 读取mysql中的表数据
* jdbc(url: String, table: String, properties: Properties)
*
* url:用于指定连接路径
* table: 指定表名
* properties:用于指定连接参数的其他属性,比如,用户名,密码等
*/
val prop = new Properties()
prop.put("user","root")
prop.put("password","123456")
prop.put("driver","com.mysql.cj.jdbc.Driver")
val df: DataFrame = spark.read
.jdbc("jdbc:mysql://10.36.140.103:3306/mydb2?serverTimezone=UTC", "country", prop)
//df.show()
println(df.count())
spark.stop()
}
}
SparkSQL连接hive
- 将hive-site.xml拷贝到spark的conf目录下
- 将core-site.xml和hdfs-site.xml拷贝到spark的conf目录下
- 将mysql的驱动包,拷贝到spark的jars目录下
3.4.3 代码连接hive
1)读取hive中的表
1. 开启hive支持
2. 将hive-site.xml以及core-site.xml,hdfs-site.xml拷贝到resources目录下
3. 读取数据:
spark.read.table("库名.表名")
2)向hive中写数据
1. 开启hive支持
2. 将hive-site.xml以及core-site.xml,hdfs-site.xml拷贝到resources目录下
3. 要写的数据,可以来源于内存,也可以来源于外部文件
注意:
--如果是来源于外部文件,可能出现路径问题,默认读取的是hdfs上的文件,如果想要读取本地文件,需要添加file:///绝对路径
--也可能出现权限文件,可以使用如下方法:
System.setProperty("HADOOP_USER_NAME","root")
4. spark.write.saveAstable("库名.表名")
小贴士: 如果指定模式,只能使用下面几个
SaveMode.Append 默认模式
SaveMode.Overwrite
SaveMode.Ignore 依然使追加操作
object _04TestHive {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("testload").enableHiveSupport().getOrCreate()
import spark.implicits._
//val df: DataFrame = spark.read.csv("file:///D:\IDEAfile\SparkSql\sql\country.csv").toDF("id", "country", "code")
// df.write.saveAsTable("mydb2.country1")
val df: DataFrame = spark.read.table("mydb2.country")
df.show()
spark.stop()
}
}
最后
以上就是无私星星为你收集整理的Spark连接Mysql与Hive的全部内容,希望文章能够帮你解决Spark连接Mysql与Hive所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复