我是靠谱客的博主 隐形柜子,最近开发中收集的这篇文章主要介绍hive上的小文件使用spark进行合并,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

hive上的小文件使用spark进行合并

引言:我们的一些实时处理的文件再不同批次下生成的文件大小不一致,
有时会产生大量的小文件,对于我们后期的数据处理影响极大,因此需要将小文件合并再save成大小均为128M的文件更为合适。
原理:首先读取该目录下的文件大小,通过计算整个文件夹所占的空间去利用spark有多少个分区就会生成几个文件夹的特性去调整写入的分区个数。

首先先读取文件的大小:利用hdfs自带的文件管理系统去读取。
fs = FileSystem.get(sc.hadoopConfiguration)

 val fs = FileSystem.get(sc.hadoopConfiguration)
val dirSize = fs.getContentSummary(new Path(lastInputPath)).getLength

然后计算重新合并后应该生成的文件个数:
((dirSize / (blockSize * 1024 * 1024)) + 1).toInt

 val fileNum: Int = ((dirSize / (blockSize * 1024 * 1024)) + 1).toInt

再读取所有文件,重新分区,保存即可!
.repartition!!!


spark.read.parquet(lastInputPath + "/*").repartition(fileNum).write.mode(SaveMode.Overwrite).parquet(lastOutputPath)

(小编这里用的是parquet的方式读写的。另外你们在用的时候切记压缩格式,不然生成的文件大小将会不一致!)
入口设置压缩格式:

spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

最后

以上就是隐形柜子为你收集整理的hive上的小文件使用spark进行合并的全部内容,希望文章能够帮你解决hive上的小文件使用spark进行合并所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部