我是靠谱客的博主 鳗鱼冬日,最近开发中收集的这篇文章主要介绍spark一行转多行操作并存储文件到hdfs为parquet格式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

spark一行转多行操作并存储文件到hdfs为parquet格式

      • object Run:
      • trait SparkUtils:(这里只是构建sparksession实例并返回,可自行创建)
      • 实现结果:
    • 注意:

直接上代码了,有不足之处,望大佬指正。

object Run:

object Run extends SparkUtils{
def main(args: Array[String]): Unit = {
var flag = true
var ip1 = ""
var ip2 = ""
var nextip = ""
val spark = getSparkSession(false, getClass.getSimpleName)
val sc = spark.sparkContext
val day = args(0) //"20200331"
import spark.implicits._
spark.read.textFile(s"hdfs://10.11.11.55:8020/dw/tmp/tmp_ip_city/${day}")
.filter(_.split(",").length>=13)
.flatMap(line=>{
var arr = new ArrayBuffer[ip_city]()
val word = line.split(",")
flag = true
ip1 = word(1)
ip2 = word(2)
nextip = ip1
while (flag){
if (nextip.equals(ip2)) {
//if放在后面可能死循环
比如存在 ip1=ip2 的情况
flag = false
}
arr += ip_city(word(0),nextip,IpUtil.ipToLong(nextip).toString, word(5),
word(6), word(7), word(8), word(9), word(10), word(11), word(12), word(13))
nextip = IpUtil.getNextIp(nextip)
}
arr
}).toDF.write.parquet(s"hdfs://10.11.11.55:8020/dw/ods/ods_persona_ipv4_city/event_day=${day}")
println("处理完一批。。。")
}
}
//若要映射到hive表这里切记与hive表数据对应,否则hive无法转换parquet格式
case class ip_city(
mysqlid:String,
ip:String,
ip_long:String,
province_name:String,
province_code:String,
city_name:String,
city_code:String,
county_name:String,
county_code:String,
carrier_name:String,
longitude:String,
latitude:String
)

trait SparkUtils:(这里只是构建sparksession实例并返回,可自行创建)

trait SparkUtils
{
protected def getSparkSession(isLocal:Boolean,appName:String): SparkSession = {
var spark:SparkSession = null
val conf = new SparkConf()
.set("spark.default.parallelism", "1000")
.set("spark.storage.memoryFraction", "0.5")
.set("spark.shuffle.consolidateFiles", "true")
.set("spark.shuffle.file.buffer", "64")
.set("spark.network.timeout","1000")
.set("spark.sql.planner.skewJoin","true")
.set("spark.shuffle.memoryFraction", "0.3")
.set("spark.reducer.maxSizeInFlight", "24")
.set("spark.shuffle.io.maxRetries", "3")
.set("spark.shuffle.io.retryWait", "20")
//.set("spark.broadcast.blockSize","1024m")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//
.set("spark.scheduler.executorTaskBlacklistTime", "60000")
//
.set("spark.task.maxFailures", "10")
if(isLocal){
spark = SparkSession.builder.appName(appName)
.master("local")
//.enableHiveSupport()
.config(conf)
.getOrCreate()
}else{
spark = SparkSession.builder.appName(appName)
.enableHiveSupport()
.config(conf)
.getOrCreate()
}
spark
}
}

实现结果:

输入的数据格式:(数据量较少存储格式为text)

idip1ip2
11.0.1.01.0.1.5
21.22.2.11.22.2.2

输出的数据格式:(因为一变多导致数据量增大,所以输出的存储格式转换为parquet)

idip
11.0.1.0
11.0.1.1
11.0.1.2
11.0.1.3
11.0.1.4
11.0.1.5
21.22.2.1
21.22.2.2

注意:

转换成case class ip_city是为了为将
DataFrame类型的数据指定schema信息。

最后

以上就是鳗鱼冬日为你收集整理的spark一行转多行操作并存储文件到hdfs为parquet格式的全部内容,希望文章能够帮你解决spark一行转多行操作并存储文件到hdfs为parquet格式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部