我是靠谱客的博主 健忘芒果,最近开发中收集的这篇文章主要介绍Hudi(8):Hudi集成Spark之code方式0. 相关文章链接1. 环境准备2. 插入数据3. 查询数据4. 更新数据5. 指定时间点查询6. 增量查询7. 删除数据8. 覆盖数据9. 提交运行,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

0. 相关文章链接

1. 环境准备

2. 插入数据

3. 查询数据

4. 更新数据

5. 指定时间点查询

6. 增量查询

7. 删除数据

8. 覆盖数据

9. 提交运行


前言:Hudi集成Spark除了用shell交互式的操作,还可以自己编写Spark程序,打包提交到集群上运行。

0. 相关文章链接

 Hudi文章汇总 

1. 环境准备

创建Maven工程,pom文件:


    <properties>
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.2.2</spark.version>
        <hadoop.version>3.1.3</hadoop.version>
        <hudi.version>0.12.0</hudi.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- 依赖Scala语言 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Hadoop Client 依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- hudi-spark3.2 -->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark3.2-bundle_${scala.binary.version}</artifactId>
            <version>${hudi.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <!--fastjson <= 1.2.80 存在安全漏洞,-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- assembly打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <!--Maven编译scala所需依赖-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2. 插入数据

import org.apache.hudi.QuickstartUtils._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

object InsertDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val tableName = "hudi_trips_cow"
        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
        val dataGen = new DataGenerator

        val inserts = convertToStringList(dataGen.generateInserts(10))
        val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(inserts, 2))
        df.write.format("hudi").
            options(getQuickstartWriteConfigs).
            option(PRECOMBINE_FIELD.key(), "ts").
            option(RECORDKEY_FIELD.key(), "uuid").
            option(PARTITIONPATH_FIELD.key(), "partitionpath").
            option(TBL_NAME.key(), tableName).
            mode(Overwrite).
            save(basePath)
    
        // 应用结束,关闭资源
        sparkSession.stop()

    }

}

3. 查询数据


import org.apache.spark.SparkConf
import org.apache.spark.sql._

object QueryDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            //      .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"

        val tripsSnapshotDF = sparkSession.
            read.
            format("hudi").
            load(basePath)

        //    时间旅行查询写法一
        //    sparkSession.read.
        //      format("hudi").
        //      option("as.of.instant", "20210728141108100").
        //      load(basePath)
        //
        //    时间旅行查询写法二
        //    sparkSession.read.
        //      format("hudi").
        //      option("as.of.instant", "2021-07-28 14:11:08.200").
        //      load(basePath)
        //
        //    时间旅行查询写法三:等价于"as.of.instant = 2021-07-28 00:00:00"
        //    sparkSession.read.
        //      format("hudi").
        //      option("as.of.instant", "2021-07-28").
        //      load(basePath)

        tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

        sparkSession
            .sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0")
            .show()

    }

}

4. 更新数据


import org.apache.hudi.QuickstartUtils._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

object UpdateDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val tableName = "hudi_trips_cow"
        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"

        val dataGen = new DataGenerator
        val updates = convertToStringList(dataGen.generateUpdates(10))
        val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(updates, 2))
        df.write.format("hudi").
            options(getQuickstartWriteConfigs).
            option(PRECOMBINE_FIELD.key(), "ts").
            option(RECORDKEY_FIELD.key(), "uuid").
            option(PARTITIONPATH_FIELD.key(), "partitionpath").
            option(TBL_NAME.key(), tableName).
            mode(Append).
            save(basePath)


        //    val tripsSnapshotDF = sparkSession.
        //      read.
        //      format("hudi").
        //      load(basePath)
        //    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
        //
        //    sparkSession
        //      .sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0")
        //      .show()

    }

}

5. 指定时间点查询

import org.apache.hudi.DataSourceReadOptions._
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object PointInTimeQueryDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"

        import sparkSession.implicits._
        val commits = sparkSession.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
        val beginTime = "000"
        val endTime = commits(commits.length - 2)

        val tripsIncrementalDF = sparkSession.read.format("hudi").
            option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
            option(BEGIN_INSTANTTIME.key(), beginTime).
            option(END_INSTANTTIME.key(), endTime).
            load(basePath)

        tripsIncrementalDF.createOrReplaceTempView("hudi_trips_point_in_time")

        sparkSession.
            sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").
            show()

    }

}

6. 增量查询

import org.apache.hudi.DataSourceReadOptions._
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object IncrementalQueryDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"

        import sparkSession.implicits._
        val commits = sparkSession.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
        val beginTime = commits(commits.length - 2)

        val tripsIncrementalDF = sparkSession.read.format("hudi").
            option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
            option(BEGIN_INSTANTTIME.key(), beginTime).
            load(basePath)

        tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

        sparkSession.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

    }

}

7. 删除数据

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql._

import scala.collection.JavaConversions._

object DeleteDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val tableName = "hudi_trips_cow"
        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
        val dataGen = new DataGenerator

        sparkSession.
            read.
            format("hudi").
            load(basePath).
            createOrReplaceTempView("hudi_trips_snapshot")

        sparkSession.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

        val ds = sparkSession.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

        val deletes = dataGen.generateDeletes(ds.collectAsList())
        val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(deletes, 2))

        df.write.format("hudi").
            options(getQuickstartWriteConfigs).
            option(OPERATION.key(), "delete").
            option(PRECOMBINE_FIELD.key(), "ts").
            option(RECORDKEY_FIELD.key(), "uuid").
            option(PARTITIONPATH_FIELD.key(), "partitionpath").
            option(TBL_NAME.key(), tableName).
            mode(Append).
            save(basePath)

        val roAfterDeleteViewDF = sparkSession.
            read.
            format("hudi").
            load(basePath)

        roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")

        // 返回的总行数应该比原来少2行
        sparkSession.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
        
    }

}

8. 覆盖数据

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql._

import scala.collection.JavaConversions._

object InsertOverwriteDemo {
    def main(args: Array[String]): Unit = {
        // 创建 SparkSession
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName)
            .setMaster("local[*]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sparkSession = SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()
            .getOrCreate()

        val tableName = "hudi_trips_cow"
        val basePath = "hdfs://hadoop1:8020/tmp/hudi_trips_cow"
        val dataGen = new DataGenerator

        sparkSession.
            read.format("hudi").
            load(basePath).
            select("uuid", "partitionpath").
            sort("partitionpath", "uuid").
            show(100, false)


        val inserts = convertToStringList(dataGen.generateInserts(10))
        val df = sparkSession.read.json(sparkSession.sparkContext.parallelize(inserts, 2)).
            filter("partitionpath = 'americas/united_states/san_francisco'")

        df.write.format("hudi").
            options(getQuickstartWriteConfigs).
            option(OPERATION.key(), "insert_overwrite").
            option(PRECOMBINE_FIELD.key(), "ts").
            option(RECORDKEY_FIELD.key(), "uuid").
            option(PARTITIONPATH_FIELD.key(), "partitionpath").
            option(TBL_NAME.key(), tableName).
            mode(Append).
            save(basePath)

        sparkSession.
            read.format("hudi").
            load(basePath).
            select("uuid", "partitionpath").
            sort("partitionpath", "uuid").
            show(100, false)

    }

}

9. 提交运行

将代码打成jar包,上传到目录myjars,执行提交命令(QueryDemo为例):

spark-submit 
    --class com.atguigu.hudi.spark.QueryDemo 
    /opt/jars/spark-hudi-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 


最后

以上就是健忘芒果为你收集整理的Hudi(8):Hudi集成Spark之code方式0. 相关文章链接1. 环境准备2. 插入数据3. 查询数据4. 更新数据5. 指定时间点查询6. 增量查询7. 删除数据8. 覆盖数据9. 提交运行的全部内容,希望文章能够帮你解决Hudi(8):Hudi集成Spark之code方式0. 相关文章链接1. 环境准备2. 插入数据3. 查询数据4. 更新数据5. 指定时间点查询6. 增量查询7. 删除数据8. 覆盖数据9. 提交运行所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(33)

评论列表共有 0 条评论

立即
投稿
返回
顶部