我是靠谱客的博主 唠叨战斗机,最近开发中收集的这篇文章主要介绍SparkSQL操作Hive Table(enableHiveSupport()),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Spark SQL支持对Hive的读写操作。然而因为Hive有很多依赖包,所以这些依赖包没有包含在默认的Spark包里面。如果Hive依赖的包能在classpath找到,Spark将会自动加载它们。需要注意的是,这些Hive依赖包必须复制到所有的工作节点上,因为它们为了能够访问存储在Hive的数据,会调用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xmlcore-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目录下面。
当使用Hive时,必须初始化一个支持Hive的SparkSession,用户即使没有部署一个Hive的环境仍然可以使用Hive。当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。
注意:从Spark 2.0.0版本开始,hive-site.xml里面的hive.metastore.warehouse.dir属性已经被spark.sql.warehouse.dir替代,用于指定warehouse的默认数据路径(必须有写权限)。

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables

String warehouseLocation = "/spark-warehouse";
// init spark session with hive support

SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL

spark.sql("SELECT * FROM src").show();
// +---+-------+

// |key|
value|

// +---+-------+

// |238|val_238|

// | 86| val_86|

// |311|val_311|

// ...

// only showing top 20 rows

// Aggregation queries are also supported.

spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+

// |count(1)|

// +--------+

// |
500 |

// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.

Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.

Dataset<String> stringsDS = sqlDF.map(row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING());
stringsDS.show();
// +--------------------+

// |
value|

// +--------------------+

// |Key: 0, Value: val_0|

// |Key: 0, Value: val_0|

// |Key: 0, Value: val_0|

// ...

// You can also use DataFrames to create temporary views within a SparkSession.

List<Record> records = new ArrayList<Record>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.

spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+

// |key| value|key| value|

// +---+------+---+------+

// |
2| val_2|
2| val_2|

// |
2| val_2|
2| val_2|

// |
4| val_4|
4| val_4|

// ...

// only showing top 20 rows

如果使用eclipse运行上述代码的话需要添加spark-hive的jars,下面是maven的配置:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
</dependency>

否则的话会遇到下面错误:

Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.
at org.apache.spark.sql.SparkSession$Builder.enableHiveSupport(SparkSession.scala:815)
at JavaSparkHiveExample.main(JavaSparkHiveExample.java:17)

与不同版本Hive Metastore的交互

Spark SQL对Hive的支持其中一个最重要的部分是与Hive metastore的交互,使得Spark SQL可以访问Hive表的元数据。从Spark 1.4.0版本开始,Spark SQL使用下面的配置可以用于查询不同版本的Hive metastores。需要注意的是,本质上Spark SQL会使用编译后的Hive 1.2.1版本的那些类来用于内部操作(serdes、UDFs、UDAFs等等)。

这里写图片描述

最后

以上就是唠叨战斗机为你收集整理的SparkSQL操作Hive Table(enableHiveSupport())的全部内容,希望文章能够帮你解决SparkSQL操作Hive Table(enableHiveSupport())所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部