我是靠谱客的博主 醉熏马里奥,最近开发中收集的这篇文章主要介绍spark处理hudi增量数据并进行聚合操作———附带详细思路和代码1 背景与结果2 方法3 代码4 附送启动pyspark shell的脚本,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
文章目录
- 1 背景与结果
- 2 方法
- 3 代码
- 3.1 法1(创建临时表,执行sql)
- 3.2 法2(使用dataframe接口执行sql)
- 3.3 法3(join操作得到增量数据后,再执行聚合)
- 4 附送启动pyspark shell的脚本
1 背景与结果
因为如果每次都对全量数据进行聚合操作,将是非常耗费时间的一件事,但是如果我们只处理与增量数据相关的数据,那处理的数据量将大大减少,程序运行的处理时间将会大大缩短。
我们对三种方法,在原数据10865条,增量数据2800条(总共包含900种聚类后种类)的条件下,进行测试,得到
- 法1的处理时间:437.8s;
- 法2的处理时间:471.3s;
- 法3的处理时间:6.2 s
2 方法
这里使用三种方法处理增量:
- 使用增量数据dataframe得到每条增量数据(list格式)
- 法1:使用总表(包含增量数据)的
dataframe
创建临时镜像表,对每条增量数据在临时镜像表进行聚合操作; - 法2:对每条增量数据对总表
dataframe
进行聚合操作;
- 法1:使用总表(包含增量数据)的
- 法3:对增量数据和全量数据做join处理,得到全量数据中与增量数据关联的全部变化的数据,然后对这部分数据进行聚合操作。
3 代码
得到全量数据:
read_hudi_file_path = '/test_hudi/PROD/hadoop/ods/fyk_test_02_fyk_test_02/fyk_test_02/mysql_10.20.3.88_3306_mf_test4/jk8_mor/*/*/*/*'
original_tablet_DF = spark.
read.
format("hudi").
load(read_hudi_file_path)
columns_to_drop =['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', '_hoodie_partition_path', '_hoodie_file_name', 'hudi_delta_streamer_ingest_date', 'hudi_delta_streamer_update_time', '_hoodie_is_deleted']
original_tablet_DF = original_tablet_DF.drop(*columns_to_drop)
得到增量数据:
write_incremental_begin_time = '20220119171247'
read_hudi_file_base_path = '/test_hudi/PROD/hadoop/ods/fyk_test_02_fyk_test_02/fyk_test_02/mysql_10.20.3.88_3306_mf_test4/jk8_mor/'
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': write_incremental_begin_time,
}
incremental_table_df = spark.read.format("hudi").options(**incremental_read_options).load(read_hudi_file_base_path)
incremental_table_df = incremental_table_df.drop(*columns_to_drop)
3.1 法1(创建临时表,执行sql)
得到每条增量数据(法1和法2共用):
# 去重
aggregate_field = ['sub_trans_code']
incremental_df = incremental_table_df.dropDuplicates((aggregate_field)).select(*aggregate_field)
# 存储每条的数据
detail_data_aggregate_list = list()
for d in incremental_df.collect():
d = list(d)
detail_data_aggregate_list.append(d)
print('使用list存储每条数据:')
print(detail_data_aggregate_list[0])
agg_params = dict()
agg_params = {'sub_total_trans_cost': 'sum', 'sub_total_trans_price': 'sum'}
使用sql语句,处理dataframe:
write_data_list = list()
write_title_list = list()
for i in range(len(detail_data_aggregate_list)):
for j in range(len(aggregate_field)):
if j != 0:
if isinstance(detail_data_aggregate_list[i][j], str):
filter_statement += f" AND {aggregate_field[j]}='{detail_data_aggregate_list[i][j]}'"
else:
filter_statement += f" AND {aggregate_field[j]}={detail_data_aggregate_list[i][j]}"
else:
if isinstance(detail_data_aggregate_list[i][j], str):
filter_statement = f"{aggregate_field[j]}='{detail_data_aggregate_list[i][j]}'"
else:
filter_statement = f"{aggregate_field[j]}={detail_data_aggregate_list[i][j]}"
print(filter_statement)
temp_write_df = original_tablet_DF.filter(filter_statement).groupBy(*aggregate_field).agg(agg_params)
write_data_list.append(tuple(list(temp_write_df.collect()[0]))) # 法2(先使用list存储,然后把再转换为dataframe)
# if i != 0:# 得到dataframe【方法1】(创建写入的dataframe,速度慢不推荐)
#
write_df = temp_write_df.union(write_df)
# else:
#
write_df = temp_write_df
# unionAll/union是返回两个数据集的并集,包括重复行
# Intersect是返回两个数据集的交集,不包括重复行
# Minus是返回两个数据集的差集,不包括重复行
# 得到dataframe【方法2】
write_title_list.extend(aggregate_field)
accumulate_field = ['sub_total_trans_cost', 'sub_total_trans_price']
write_title_list.extend(accumulate_field)
write_df = self._spark.createDataFrame(write_data_list, write_title_list)
3.2 法2(使用dataframe接口执行sql)
original_tablet_DF.createOrReplaceTempView('original_table_name')
aggregate_field_list_str = ','.join([aggregate_field[i] for i in range(0, len(aggregate_field))])
write_data_list = list()
for i in range(len(detail_data_aggregate_list)):
sql_statement = f"SELECT {aggregate_field_list_str}"
sql_statement += ",SUM(sub_total_trans_cost), SUM(sub_total_trans_price) "
sql_statement += f" FROM original_table_name WHERE"
for j in range(len(aggregate_field)):
if j != 0:
if isinstance(detail_data_aggregate_list[i][j], str):
sql_statement += " AND {0}='{1}'".format(aggregate_field[j], detail_data_aggregate_list[i][j])
else:
sql_statement += " AND {0}={1}".format(aggregate_field[j], detail_data_aggregate_list[i][j])
else:
if isinstance(detail_data_aggregate_list[i][j], str):
sql_statement += " {0}='{1}'".format(aggregate_field[j], detail_data_aggregate_list[i][j])
else:
sql_statement += " {0}={1}".format(aggregate_field[j], detail_data_aggregate_list[i][j])
sql_statement += f" GROUP BY {aggregate_field_list_str}"
print(sql_statement)
write_data_list.append(tuple(list(spark.sql(sql_statement).collect()[0])))
print('write_data_list:')
print(write_data_list)
write_title_list = list()
write_title_list.extend(self._aggregate_field)
accumulate_field = ['sub_total_trans_cost', 'sub_total_trans_price']
write_title_list.extend(accumulate_field)
# print(write_title_list)
write_df = spark.createDataFrame(write_data_list, write_title_list)
write_df.show(5)
print(write_df.count())
3.3 法3(join操作得到增量数据后,再执行聚合)
# join
write_df = original_tablet_DF.join(incremental_df, aggregate_field, 'left_semi')
write_df = write_df.groupBy(*aggregate_field).agg(agg_params)
4 附送启动pyspark shell的脚本
#!/usr/bin/env bash
/software/spark-3.1.2-bin-hadoop2.7/bin/pyspark --jars /root/yl/hudi-0.10.0/docker/hoodie/hadoop/hive_base/target/hoodie-spark-bundle.jar --driver-class-path /software/hadoop-2.10.1/etc/hadoop:/software/apache-hive-2.3.8-bin/conf/:/software/member/config/mysql-connector-java-5.1.49-bin.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 3 --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
最后
以上就是醉熏马里奥为你收集整理的spark处理hudi增量数据并进行聚合操作———附带详细思路和代码1 背景与结果2 方法3 代码4 附送启动pyspark shell的脚本的全部内容,希望文章能够帮你解决spark处理hudi增量数据并进行聚合操作———附带详细思路和代码1 背景与结果2 方法3 代码4 附送启动pyspark shell的脚本所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复