概述
文章目录
- 0 背景
- 1 代码原理
- 1.1 基本准备
- 1.2 遍历文件目录
- 2 代码
0 背景
由于每次读取hudi文件时,都需要加上/*/*
等相对路径,这就需要每次都使用Hadoop指令去查询parquet所在的层数,十分繁琐,于是编写了自动寻找.parquet
的文件以添加/*
。
注意⚠️:此方法分只是用于读取目录下全部分区数据,如果需要读取某个分区的文件,还是需要使用/*
方法。
1 代码原理
1.1 基本准备
因为pyspark功能实现的底层实现使用的是scala,而scala底层实现的是java,所以pyspark功能的底层实现实际上调用的是java。因此,这里使用py4j来实现对java代码的调用,而py4j已经被封装在了spark.sparkContext._jvm
中。
使用spark.sparkContext._jvm
引入对Hadoop处理的包(例如,org.apache.hadoop.fs.FileSystem
)。使用Hadoop包得到Hadoop路径下的文件/文件夹路径,判断是否是文件或者文件夹,判断路径是否存在。
sc = spark.sparkContext # 得到spark的运行环境参数
FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem # 一个通用的文件系统API,提供了不同文件系统的统一访问方式(https://hadoop.apache.org/docs/r3.3.1/api/org/apache/hadoop/fs/FileSystem.html)
Path = sc._jvm.org.apache.hadoop.fs.Path # Hadoop文件系统中统一的文件或目录描述,类似于java.io.File对本地文件系统的文件或目录描述(https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Path.html)
# 返回配置的 FileSystem 实现。
fs = FileSystem.get(sc._jsc.hadoopConfiguration())
# 判读是否存在
if fs.exists(Path(hdfs_path)) == False: pass
# 得到路径
fs.listStatus(Path(folder_path)).getPath()
1.2 遍历文件目录
这里使用的类似于树的层序遍历(其实文件系统,就是一个树形结构),使用队列存储将要遍历结点的双亲结点信息,如果是目录,才放入到队列中,进行后续的遍历,而目录所在的层级相当于树的深度,根结点算0层。由于一般的层序遍历都是遍历该层最右边的结点后,才累加当前的层高,但是这里需要找到.parquet
后,立即返回目录层数,因此这里提前设初始的层数1(相当于每个结点都是带着自己的层数),同时多增加了一个is_find_parquet_file
变量,来防止树只有一层且第一层没有找到结点的情况下还返回1.
为了防止遍历时间过长,对目录下的文件遍历数量进行了限制,当遍历数量超过限制的文件个数,就终止对当前目录的遍历(此值可以自行调整)。
查询是否是.parquet
,既可以使用直接用最后8位是否等于.parquet
,也可以使用正则表达式来进行匹配。
2 代码
def __getFileParquetLayer(spark, hdfs_path):
'''
:param hdfs_path: /test_hudi/PROD/hadoop/ods/fyk_test_02_fyk_test_02/fyk_test_02/mysql_10.20.3.88_3306_mf_test22/test_agg_res7
:param spark:
:return:
'''
sc = spark.sparkContext
FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
Path = sc._jvm.org.apache.hadoop.fs.Path
fs = FileSystem.get(sc._jsc.hadoopConfiguration())
if fs.exists(Path(hdfs_path)) == False: # 判断文件路径是否存在
return 0
else:
folder_queue = [] # 文件夹队列
folder_queue.append(hdfs_path)
file_traverse_count = 5 # 控制寻找目录下的文件数量,防止遍历时间过长
file_layer = 1 # 起始层数为1
is_find_parquet_file = False
last_folder_path = hdfs_path # 保存下一层最后一个节点
while len(folder_queue):
folder_path = folder_queue.pop(0)
status = fs.listStatus(Path(folder_path))
file_count = 0 # 控制寻找目录下的文件数量,防止遍历时间过长
for file_status in status:
file_count += 1
# print(f"file_count:{file_count}")
if file_count >= file_traverse_count:
break
else:pass
tmp_path = file_status.getPath()
tmp_str_path = str(tmp_path)
# print(tmp_str_path) # 打印遍历的文件
# re.search(r".parquet$", str) # import re #
if len(tmp_str_path) > 8:
if tmp_str_path[-8:] == '.parquet':
print(f"parquet文件路径:{tmp_str_path}")
print(f'文件路径深度:{file_layer}')
is_find_parquet_file = True
break
else:
if fs.isDirectory(tmp_path): # fs.isFile(Path(hdfs_path))
folder_queue.append(tmp_str_path)
else:pass
else:
if fs.isDirectory(tmp_path): #判断是否是文件夹
folder_queue.append(tmp_str_path)
else:pass
if is_find_parquet_file:
break
else:
pass
if folder_path == last_folder_path: # 处理该层最后一个节点
file_layer += 1
if len(folder_queue) > 0:
last_folder_path = folder_queue[-1]
else:pass
else:pass
if is_find_parquet_file:
return file_layer
else:
return 0
最后
以上就是寒冷学姐为你收集整理的spark在hdfs上自动寻找指定后缀的文件(使用层序遍历)———附带详细思路和代码0 背景1 代码原理2 代码的全部内容,希望文章能够帮你解决spark在hdfs上自动寻找指定后缀的文件(使用层序遍历)———附带详细思路和代码0 背景1 代码原理2 代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复