我是靠谱客的博主 无私星星,最近开发中收集的这篇文章主要介绍Spark连接Mysql与Hive,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Spark连接Mysql与hive

连接Mysql
导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.21</version>
</dependency>

获取SparkSession对象

package com.qf.sql.day03

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * 演示SparkSql如何加载文件
 */
object _03TestMysql {
    def main(args: Array[String]): Unit = {
        write
        read
    }
    def write: Unit = {
        val spark = SparkSession.builder()
          .appName("testload")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        /*
         *   写入mysql中
         *   jdbc(url: String, table: String, properties: Properties)
         *
         *     url:用于指定连接路径
         *     table: 指定表名
         *     properties:用于指定连接参数的其他属性,比如,用户名,密码等
         */
        val df: DataFrame = spark.read
          .csv("sql/country.csv")
          .toDF("id","country","code")

        val prop = new Properties()
        prop.put("user","root")
        prop.put("password","123456")
        prop.put("driver","com.mysql.cj.jdbc.Driver")

        /**
         * 写出时的模式研究:
         * SaveMode.Append      追加    ,注意主键的问题,如果有主键,可能会报错
         * SaveMode.ErrorIfExists  如果存在就提示报错,  是默认模式
         * SaveMode.Ignore     忽略  ,  如果已经存在,则不插入
         * SaveMode.Overwrite   覆盖,   如果已经存在,则删除,重新创建
         */
        df.write.mode(SaveMode.Ignore)
          .jdbc("jdbc:mysql://10.36.140.103:3306/mydb2?serverTimezone=UTC","country",prop)


        spark.stop()
    }

    def read: Unit = {
        val spark = SparkSession.builder()
          .appName("testload")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        /*
         *   读取mysql中的表数据
         *   jdbc(url: String, table: String, properties: Properties)
         *
         *     url:用于指定连接路径
         *     table: 指定表名
         *     properties:用于指定连接参数的其他属性,比如,用户名,密码等
         */
        val prop = new Properties()
        prop.put("user","root")
        prop.put("password","123456")
        prop.put("driver","com.mysql.cj.jdbc.Driver")

        val df: DataFrame = spark.read
          .jdbc("jdbc:mysql://10.36.140.103:3306/mydb2?serverTimezone=UTC", "country", prop)
        //df.show()
        println(df.count())
        
        spark.stop()
    }
}

SparkSQL连接hive

  1. 将hive-site.xml拷贝到spark的conf目录下
  2. 将core-site.xml和hdfs-site.xml拷贝到spark的conf目录下
  3. 将mysql的驱动包,拷贝到spark的jars目录下

3.4.3 代码连接hive

1)读取hive中的表

1. 开启hive支持
2. 将hive-site.xml以及core-site.xml,hdfs-site.xml拷贝到resources目录下
3. 读取数据:
	spark.read.table("库名.表名")

2)向hive中写数据

1. 开启hive支持
2. 将hive-site.xml以及core-site.xml,hdfs-site.xml拷贝到resources目录下
3. 要写的数据,可以来源于内存,也可以来源于外部文件
	注意:
	--如果是来源于外部文件,可能出现路径问题,默认读取的是hdfs上的文件,如果想要读取本地文件,需要添加file:///绝对路径
	--也可能出现权限文件,可以使用如下方法:
	 System.setProperty("HADOOP_USER_NAME","root")
4. spark.write.saveAstable("库名.表名")      	


小贴士:  如果指定模式,只能使用下面几个
	   SaveMode.Append        默认模式
	   SaveMode.Overwrite
	   SaveMode.Ignore       依然使追加操作
object _04TestHive {
  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME","root")
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("testload").enableHiveSupport().getOrCreate()
    import spark.implicits._

    //val df: DataFrame = spark.read.csv("file:///D:\IDEAfile\SparkSql\sql\country.csv").toDF("id", "country", "code")

   // df.write.saveAsTable("mydb2.country1")

    val df: DataFrame = spark.read.table("mydb2.country")
    df.show()
    spark.stop()

  }

}

最后

以上就是无私星星为你收集整理的Spark连接Mysql与Hive的全部内容,希望文章能够帮你解决Spark连接Mysql与Hive所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部