我是靠谱客的博主 迷人小虾米,最近开发中收集的这篇文章主要介绍spark 显示hdfs 路径_Spark读取数据的同时;获取数据所在的HDFS路径,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

常规的Spark读取文件函数为textFile,该函数只会返回文件的内容;而使用hadoopFile会将partition的一些属性也存放在RDD中!

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

通过mapPartitionsWithInputSplit函数其中一个参数InputSplit能快速获取iterator所在文件路径、长度等信息

hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)])

源码:

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.io.{LongWritable, Text}

import org.apache.hadoop.mapred.{FileSplit, InputSplit, JobConf, TextInputFormat}

import org.apache.spark.rdd.HadoopRDD

import org.apache.spark.sql.SparkSession

import scala.util.parsing.json.JSONObject

object GeneratedIndex {

def main(args: Array[String]): Unit = {

val sc = SparkSession.builder().master("yarn").getOrCreate().sparkContext

val input = args(0)

val jvpDate = args(1)

val tableName = args(2)

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]

// 获取数据的归属文件路径

val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {

val file = inputSplit.asInstanceOf[FileSplit]

val fp = file.getPath.toString() //数据所在的HDFS路径

val start = fp.lastIndexOf("-")

val end = fp.lastIndexOf(".")

iterator.map(x => {

val lines = x._2.toString.split("\|") // 分割读取数据

val uid = lines(2)

val pid = lines(5).substring(0, 3)

val map = List((pid, fp.slice(start + 1, end)))

(uid, map)

})

})

}

最后

以上就是迷人小虾米为你收集整理的spark 显示hdfs 路径_Spark读取数据的同时;获取数据所在的HDFS路径的全部内容,希望文章能够帮你解决spark 显示hdfs 路径_Spark读取数据的同时;获取数据所在的HDFS路径所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部