我是靠谱客的博主 寒冷学姐,最近开发中收集的这篇文章主要介绍spark在hdfs上自动寻找指定后缀的文件(使用层序遍历)———附带详细思路和代码0 背景1 代码原理2 代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 0 背景
  • 1 代码原理
    • 1.1 基本准备
    • 1.2 遍历文件目录
  • 2 代码

0 背景

由于每次读取hudi文件时,都需要加上/*/*等相对路径,这就需要每次都使用Hadoop指令去查询parquet所在的层数,十分繁琐,于是编写了自动寻找.parquet的文件以添加/*

注意⚠️:此方法分只是用于读取目录下全部分区数据,如果需要读取某个分区的文件,还是需要使用/*方法。

1 代码原理

1.1 基本准备

因为pyspark功能实现的底层实现使用的是scala,而scala底层实现的是java,所以pyspark功能的底层实现实际上调用的是java。因此,这里使用py4j来实现对java代码的调用,而py4j已经被封装在了spark.sparkContext._jvm中。

使用spark.sparkContext._jvm引入对Hadoop处理的包(例如,org.apache.hadoop.fs.FileSystem)。使用Hadoop包得到Hadoop路径下的文件/文件夹路径,判断是否是文件或者文件夹,判断路径是否存在。

    sc = spark.sparkContext # 得到spark的运行环境参数
    
    FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem # 一个通用的文件系统API,提供了不同文件系统的统一访问方式(https://hadoop.apache.org/docs/r3.3.1/api/org/apache/hadoop/fs/FileSystem.html)
    
    Path = sc._jvm.org.apache.hadoop.fs.Path # Hadoop文件系统中统一的文件或目录描述,类似于java.io.File对本地文件系统的文件或目录描述(https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Path.html)
    
    # 返回配置的 FileSystem 实现。
    fs = FileSystem.get(sc._jsc.hadoopConfiguration()) 
	
	# 判读是否存在
	if fs.exists(Path(hdfs_path)) == False: pass
	
	# 得到路径
	fs.listStatus(Path(folder_path)).getPath()

1.2 遍历文件目录

这里使用的类似于树的层序遍历(其实文件系统,就是一个树形结构),使用队列存储将要遍历结点的双亲结点信息,如果是目录,才放入到队列中,进行后续的遍历,而目录所在的层级相当于树的深度,根结点算0层。由于一般的层序遍历都是遍历该层最右边的结点后,才累加当前的层高,但是这里需要找到.parquet后,立即返回目录层数,因此这里提前设初始的层数1(相当于每个结点都是带着自己的层数),同时多增加了一个is_find_parquet_file变量,来防止树只有一层且第一层没有找到结点的情况下还返回1.

为了防止遍历时间过长,对目录下的文件遍历数量进行了限制,当遍历数量超过限制的文件个数,就终止对当前目录的遍历(此值可以自行调整)。

查询是否是.parquet,既可以使用直接用最后8位是否等于.parquet,也可以使用正则表达式来进行匹配。

2 代码

def __getFileParquetLayer(spark, hdfs_path):
    '''
    :param hdfs_path: /test_hudi/PROD/hadoop/ods/fyk_test_02_fyk_test_02/fyk_test_02/mysql_10.20.3.88_3306_mf_test22/test_agg_res7
    :param spark:
    :return:
    '''

    sc = spark.sparkContext
    FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
    Path = sc._jvm.org.apache.hadoop.fs.Path
    fs = FileSystem.get(sc._jsc.hadoopConfiguration())

    if fs.exists(Path(hdfs_path)) == False: # 判断文件路径是否存在
        return 0
    else:
        folder_queue = [] # 文件夹队列
        folder_queue.append(hdfs_path)
        file_traverse_count = 5 # 控制寻找目录下的文件数量,防止遍历时间过长
        file_layer = 1 # 起始层数为1
        is_find_parquet_file = False
        last_folder_path = hdfs_path # 保存下一层最后一个节点
        while len(folder_queue):
            folder_path = folder_queue.pop(0)
            status = fs.listStatus(Path(folder_path))
            file_count = 0 # 控制寻找目录下的文件数量,防止遍历时间过长
            for file_status in status:
                file_count += 1
                # print(f"file_count:{file_count}")
                if file_count >= file_traverse_count:
                    break
                else:pass

                tmp_path = file_status.getPath()
                tmp_str_path = str(tmp_path)
                # print(tmp_str_path) # 打印遍历的文件
                # re.search(r".parquet$", str) # import re #
                if len(tmp_str_path) > 8:
                    if tmp_str_path[-8:] == '.parquet':
                        print(f"parquet文件路径:{tmp_str_path}")
                        print(f'文件路径深度:{file_layer}')
                        is_find_parquet_file = True
                        break
                    else:
                        if fs.isDirectory(tmp_path): # fs.isFile(Path(hdfs_path))
                            folder_queue.append(tmp_str_path)
                        else:pass
                else:
                    if fs.isDirectory(tmp_path): #判断是否是文件夹
                        folder_queue.append(tmp_str_path)
                    else:pass

            if is_find_parquet_file:
                break
            else:
                pass

            if folder_path == last_folder_path: # 处理该层最后一个节点
                file_layer += 1
                if len(folder_queue) > 0:
                    last_folder_path = folder_queue[-1]
                else:pass
            else:pass

        if is_find_parquet_file:
            return file_layer
        else:
            return 0

最后

以上就是寒冷学姐为你收集整理的spark在hdfs上自动寻找指定后缀的文件(使用层序遍历)———附带详细思路和代码0 背景1 代码原理2 代码的全部内容,希望文章能够帮你解决spark在hdfs上自动寻找指定后缀的文件(使用层序遍历)———附带详细思路和代码0 背景1 代码原理2 代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部