我是靠谱客的博主 哭泣皮皮虾,最近开发中收集的这篇文章主要介绍HUDI preCombinedField 总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

总结 HUDI preCombinedField,分两大类总结,一类是Spark SQL,这里指的是merge,因为只有merge语句中有多条记录,讨论preCombinedField才有意义;一类是Spark DF,HUDI0.9版本支持SQL建表和增删改查

总结

先说结论:

Spark DF建表写数据时(含更新):
1、UPSERT,当数据重复时(这里指同一主键对应多条记录),程序在写数据前会根据预合并字段ts进行去重,去重保留ts值最大的那条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
2、INSERT时,没有预合并,程序依次写入,实际更新为最后一条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
Spark SQL建表,写数据时(含更新):
有ts时,预合并时如果数据重复取预合并字段值最大的那条记录,最大值相同的取第一个。写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
没有ts时,则默认将主键字段的第一个值作为预合并字段,如果数据重复,去重时会取第一个值,写数据时,直接覆盖历史数据(因为这里的预合并字段为主键字段,等于历史值,其实原理跟上面有ts时一样)

PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field

说明:
1、这里有ts代表设置了preCombinedField字段
2、hudi默认使用布隆索引,布隆索引只保证同一分区下同一个主键对应的值唯一,可以使用全局索引保证所有分区值唯一,这里不展开细说


private String getDefaultIndexType(EngineType engineType) {
switch (engineType) {
case SPARK:
return HoodieIndex.IndexType.BLOOM.name();
case FLINK:
case JAVA:
return HoodieIndex.IndexType.INMEMORY.name();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}

3、如果在测试过程中,发现和我的结论不一致,可能和后面的注意事项有关。
4、当指定了hoodie.datasource.write.insert.drop.duplicates=true时,不管是insert还是upsert,如果存在历史数据则不更新。实际在源码中,如果为upsert,也会修改为insert。

if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS is set to be true, " +
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
operation = WriteOperationType.INSERT
}

Spark DF

先说DF建表,DF写hudi表时,默认情况下,hudi,必须指定preCombinedField,否则,会抛出异常(当为insert等其他类型时,preCombinedField可以不用设置,具体见后面的源码解读部分),示例如下

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[*]")
.appName("TestHuDiPreCombinedFiled")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate()
val tableName = "test_hudi_table"
val data = Array((7, "name12", 1.21, 108L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
(7, "name3", 3.45, 108L, "2021-05-06")
)
val df = spark.createDataFrame(data).toDF("id", "name", "price", "ts", "dt")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts"). //指定preCombinedField=ts
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "dt").
option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
option("hoodie.table.name", tableName).
//
option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
//
option(OPERATION.key(), "INSERT").
option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("append").
save(s"/tmp/${tableName}")
val read_df = spark.
read.
format("hudi").
load(s"/tmp/${tableName}" + "/*")
read_df.show()

Spark DF写数据默认OPERATION为UPSERT,当数据重复时(这里指同一主键对应多条记录),程序在写数据前会根据预合并字段ts进行去重,去重保留ts值最大的那条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
当OPERATION为INSERT时(option(OPERATION_OPT_KEY.key(), “INSERT”)),ts不是必须的,可以不设置,没有预合并,程序依次写入,实际更新为最后一条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。

附hoodie.properties:

#Properties saved on Sat Jul 10 15:08:16 CST 2021
#Sat Jul 10 15:08:16 CST 2021
hoodie.table.precombine.field=ts
hoodie.table.name=test_hudi_table
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=1
hoodie.timeline.layout.version=1
hoodie.table.partition.columns=dt

可见,hudi表元数据里有hoodie.table.precombine.field=ts,代表preCombinedField生效

SQL

SQL与DF不同,分为两种,有预合并和没有预合并

没有预合并

SQL默认没有预合并

spark.sql(
s"""
| create table ${tableName} (
|
id int,
|
name string,
|
price double,
|
ts long,
|
dt string
|) using hudi
| partitioned by (dt)
| options (
|
primaryKey = 'id',
|
type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(
s"""
|merge into ${tableName} as t0
|using (
|
select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
|
select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
|
select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
|
select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
| ) as s0
|on t0.id = s0.id
|when matched and opt_type!='DELETE' then update set *
|when matched and opt_type='DELETE' then delete
|when not matched and opt_type!='DELETE' then insert *
|""".stripMargin)
spark.table(tableName).show()

没有设置预合并字段值,如果数据重复,去重时会取第一个值,写数据时,直接覆盖历史数据

查看hoodie.properties和在spark.sql(s"show create table ${tableName}").show(false)打印信息里发现表的元数据信息确实没有preCombinedField,示例中虽然有ts字段,但是没有没有显示设置,当然可以直接去掉ts字段,大家可以自行测试。

有预合并

spark.sql(
s"""
| create table ${tableName} (
|
id int,
|
name string,
|
price double,
|
ts long,
|
dt string
|) using hudi
| partitioned by (dt)
| options (
|
primaryKey = 'id',
|
preCombineField = 'ts',
|
type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(
s"""
|merge into ${tableName} as t0
|using (
|
select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
|
select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
|
select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
|
select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
| ) as s0
|on t0.id = s0.id
|when matched and opt_type!='DELETE' then update set *
|when matched and opt_type='DELETE' then delete
|when not matched and opt_type!='DELETE' then insert *
|""".stripMargin)
spark.table(tableName).show()

SQL的唯一的区别是在建表语句中加了配置preCombineField = ‘ts’,同样可以在hoodie.properties和打印信息里查看是否有hoodie.table.precombine.field=ts信息。

预合并时如果数据重复取预合并字段值最大的那条记录,最大值相同的取第一个。写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。

SQL与DF结合

先用SQL建表,再用DF写数据。这种情况主要是想建表时不想多一列ts字段,而在预合并时可以添加一列预合并字段进行去重,因为目前的版本SQL没有实现该功能。在SQL建表时如果指定了 preCombineField = ‘ts’,则表结构中必须有ts这个字段。


val tableName = "test_hudi_table4"
spark.sql(
s"""
| create table ${tableName} (
|
id int,
|
name string,
|
price double,
|
dt string
|) using hudi
| partitioned by (dt)
| options (
|
primaryKey = 'id',
|
type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
val data = Array((7, "name12", 1.21, 106L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
(7, "name3", 3.45, 107L, "2021-05-06")
)
val df = spark.createDataFrame(data).toDF("id", "name", "price", "ts", "dt")
df.show()
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts"). //指定preCombinedField=ts
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "dt").
option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
option("hoodie.table.name", tableName).
//
option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
//
option(OPERATION.key(), "INSERT").
option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
save(s"/tmp/${tableName}")
val read_df = spark.
read.
format("hudi").
load(s"/tmp/${tableName}" + "/*")
read_df.show()
spark.table(tableName).show()

上面的程序主要是用SQL先建了表的元数据,然后再用程序指定了PRECOMBINE_FIELD_OPT_KEY=ts,这样就实现了既可以预合并去重,也不用在建表中指定ts字段。但是打印中发现用程序读parquet文件时多了ts列,读表时因为元数据里没有ts列,没有打印出来,实际文件存储的有ts这一列。

上面只是模拟了这一场景,而我们想实现的时下面的

spark.sql(
s"""
| create table ${tableName} (
|
id int,
|
name string,
|
price double,
|
dt string
|) using hudi
| partitioned by (dt)
| options (
|
primaryKey = 'id',
|
preCombineField = 'ts',
|
type = 'cow'
| )
| location '/tmp/${tableName}'
|""".stripMargin)
spark.sql(
s"""
|merge into ${tableName} as t0
|using (
|
select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
|
select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
|
select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
|
select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
| ) as s0
|on t0.id = s0.id
|when matched and opt_type!='DELETE' then update set *
|when matched and opt_type='DELETE' then delete
|when not matched and opt_type!='DELETE' then insert *
|""".stripMargin)

在建表时指定了preCombineField = ‘ts’,但是表结构中没有ts字段,而且后面的merge sql拼接时添加这一列。目前master分支还不支持这种情况,如果想实现这一情况,可以自己尝试修改源码支持。

代码

示例代码已上传到gitee,由于公司网把github屏蔽了,以后暂时转到gitee上。

源码解读

解读部分源码

更新:2021-09-26,因为0.9.0版本已发布,故更新源代码解析,可能存在部分源代码没有更新

程序写hudi时ts的必须性

默认配置时,如果不指定PRECOMBINE_FIELD_OPT_KEY,则会抛出以下异常:

21/07/13 20:04:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record. Acceptable fields were :[id, name, price, tss, dt]
at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:437)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:233)
at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:230)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

对应的源码:
HoodieSparkSqlWriter.scala 230、233行

230	val hoodieAllIncomingRecords = genericRecords.map(gr => {
val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
val hoodieRecord = if (shouldCombine) {
233	val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(processedRecord,
orderingVal, keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS_NAME))
} else {
DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
}
hoodieRecord
}).toJavaRDD()

getNestedFieldVal

public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) {
String[] parts = fieldName.split("\.");
......
if (returnNullIfNotFound) {
return null;
434 } else if (valueNode.getSchema().getField(parts[i]) == null) {
throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
437
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
} else {
throw new HoodieException("The value of " + parts[i] + " can not be null");
}

233行,如果shouldCombinetrue,则会调用getNestedFieldVal,并将PRECOMBINE_FIELD_OPT_KEY的值作为fieldName参数传给getNestedFieldVal,而在getNestedFieldVal的434行发现当PRECOMBINE_FIELD_OPT_KEY的值null时抛出上面的异常。

可以发现当shouldCombine==true,才会调用getNestedFieldVal,才会抛出该异常,而shouldCombine何时为true呢

val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean

当INSERT_DROP_DUPS为true或者操作类型为UPSERT时,shouldCombine为true,默认的INSERT_DROP_DUPS=false


/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
val INSERT_DROP_DUPS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.insert.drop.duplicates")
.defaultValue("false")
.withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")

也就是默认情况下,upsert操作,ts是必须的,而insert等其他操作可以没有ts值。这样我们就可以根据实际情况灵活运用了。

注意

用SQL创建新表或者DF append模式创建新表时,如果对应的数据目录已存在,需要先将文件夹删掉,因为hoodie.properties里保存了表的元数据信息,程序里会根据文件信息判断表是否存在,如果存在,会复用旧表的元数据。这种情况存在于想用同一个表测试上面多种情况

最后

以上就是哭泣皮皮虾为你收集整理的HUDI preCombinedField 总结的全部内容,希望文章能够帮你解决HUDI preCombinedField 总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部