我是靠谱客的博主 健忘芒果,最近开发中收集的这篇文章主要介绍Hudi(8):Hudi集成Spark之code方式0. 相关文章链接1. 环境准备2. 插入数据3. 查询数据4. 更新数据5. 指定时间点查询6. 增量查询7. 删除数据8. 覆盖数据9. 提交运行,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
目录
0. 相关文章链接
1. 环境准备
2. 插入数据
3. 查询数据
4. 更新数据
5. 指定时间点查询
6. 增量查询
7. 删除数据
8. 覆盖数据
9. 提交运行
前言:Hudi集成Spark除了用shell交互式的操作,还可以自己编写Spark程序,打包提交到集群上运行。
0. 相关文章链接
Hudi文章汇总
1. 环境准备
创建Maven工程,pom文件:
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.2.2</spark.version>
<hadoop.version>3.1.3</hadoop.version>
<hudi.version>0.12.0</hudi.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- hudi-spark3.2 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3.2-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!--fastjson <= 1.2.80 存在安全漏洞,-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- assembly打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!--Maven编译scala所需依赖-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2. 插入数据
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
object InsertDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val tableName = "hudi_trips_cow"
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Overwrite).
save(basePath)
// 应用结束,关闭资源
sparkSession.stop()
}
}
3. 查询数据
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object QueryDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
// .setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
val tripsSnapshotDF = sparkSession.
read.
format("hudi").
load(basePath)
// 时间旅行查询写法一
// sparkSession.read.
// format("hudi").
// option("as.of.instant", "20210728141108100").
// load(basePath)
//
// 时间旅行查询写法二
// sparkSession.read.
// format("hudi").
// option("as.of.instant", "2021-07-28 14:11:08.200").
// load(basePath)
//
// 时间旅行查询写法三:等价于"as.of.instant = 2021-07-28 00:00:00"
// sparkSession.read.
// format("hudi").
// option("as.of.instant", "2021-07-28").
// load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
sparkSession
.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0")
.show()
}
}
4. 更新数据
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
object UpdateDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val tableName = "hudi_trips_cow"
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
// val tripsSnapshotDF = sparkSession.
// read.
// format("hudi").
// load(basePath)
// tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
//
// sparkSession
// .sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0")
// .show()
}
}
5. 指定时间点查询
import org.apache.hudi.DataSourceReadOptions._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object PointInTimeQueryDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
import sparkSession.implicits._
val commits = sparkSession.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = "000"
val endTime = commits(commits.length - 2)
val tripsIncrementalDF = sparkSession.read.format("hudi").
option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME.key(), beginTime).
option(END_INSTANTTIME.key(), endTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_point_in_time")
sparkSession.
sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").
show()
}
}
6. 增量查询
import org.apache.hudi.DataSourceReadOptions._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object IncrementalQueryDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
import sparkSession.implicits._
val commits = sparkSession.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
val tripsIncrementalDF = sparkSession.read.format("hudi").
option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME.key(), beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
sparkSession.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
}
}
7. 删除数据
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql._
import scala.collection.JavaConversions._
object DeleteDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val tableName = "hudi_trips_cow"
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
sparkSession.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
sparkSession.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
val ds = sparkSession.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(deletes, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(), "delete").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
val roAfterDeleteViewDF = sparkSession.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
// 返回的总行数应该比原来少2行
sparkSession.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
}
}
8. 覆盖数据
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql._
import scala.collection.JavaConversions._
object InsertOverwriteDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val tableName = "hudi_trips_cow"
val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
val dataGen = new DataGenerator
sparkSession.
read.format("hudi").
load(basePath).
select("uuid", "partitionpath").
sort("partitionpath", "uuid").
show(100, false)
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(), "insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
sparkSession.
read.format("hudi").
load(basePath).
select("uuid", "partitionpath").
sort("partitionpath", "uuid").
show(100, false)
}
}
9. 提交运行
将代码打成jar包,上传到目录myjars,执行提交命令(QueryDemo为例):
spark-submit
--class com.atguigu.hudi.spark.QueryDemo
/opt/jars/spark-hudi-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
注:其他Hudi相关文章链接由此进 -> Hudi文章汇总
最后
以上就是健忘芒果为你收集整理的Hudi(8):Hudi集成Spark之code方式0. 相关文章链接1. 环境准备2. 插入数据3. 查询数据4. 更新数据5. 指定时间点查询6. 增量查询7. 删除数据8. 覆盖数据9. 提交运行的全部内容,希望文章能够帮你解决Hudi(8):Hudi集成Spark之code方式0. 相关文章链接1. 环境准备2. 插入数据3. 查询数据4. 更新数据5. 指定时间点查询6. 增量查询7. 删除数据8. 覆盖数据9. 提交运行所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复