概述
spark一行转多行操作并存储文件到hdfs为parquet格式
- object Run:
- trait SparkUtils:(这里只是构建sparksession实例并返回,可自行创建)
- 实现结果:
- 注意:
直接上代码了,有不足之处,望大佬指正。
object Run:
object Run extends SparkUtils{
def main(args: Array[String]): Unit = {
var flag = true
var ip1 = ""
var ip2 = ""
var nextip = ""
val spark = getSparkSession(false, getClass.getSimpleName)
val sc = spark.sparkContext
val day = args(0) //"20200331"
import spark.implicits._
spark.read.textFile(s"hdfs://10.11.11.55:8020/dw/tmp/tmp_ip_city/${day}")
.filter(_.split(",").length>=13)
.flatMap(line=>{
var arr = new ArrayBuffer[ip_city]()
val word = line.split(",")
flag = true
ip1 = word(1)
ip2 = word(2)
nextip = ip1
while (flag){
if (nextip.equals(ip2)) {
//if放在后面可能死循环
比如存在 ip1=ip2 的情况
flag = false
}
arr += ip_city(word(0),nextip,IpUtil.ipToLong(nextip).toString, word(5),
word(6), word(7), word(8), word(9), word(10), word(11), word(12), word(13))
nextip = IpUtil.getNextIp(nextip)
}
arr
}).toDF.write.parquet(s"hdfs://10.11.11.55:8020/dw/ods/ods_persona_ipv4_city/event_day=${day}")
println("处理完一批。。。")
}
}
//若要映射到hive表这里切记与hive表数据对应,否则hive无法转换parquet格式
case class ip_city(
mysqlid:String,
ip:String,
ip_long:String,
province_name:String,
province_code:String,
city_name:String,
city_code:String,
county_name:String,
county_code:String,
carrier_name:String,
longitude:String,
latitude:String
)
trait SparkUtils:(这里只是构建sparksession实例并返回,可自行创建)
trait SparkUtils
{
protected def getSparkSession(isLocal:Boolean,appName:String): SparkSession = {
var spark:SparkSession = null
val conf = new SparkConf()
.set("spark.default.parallelism", "1000")
.set("spark.storage.memoryFraction", "0.5")
.set("spark.shuffle.consolidateFiles", "true")
.set("spark.shuffle.file.buffer", "64")
.set("spark.network.timeout","1000")
.set("spark.sql.planner.skewJoin","true")
.set("spark.shuffle.memoryFraction", "0.3")
.set("spark.reducer.maxSizeInFlight", "24")
.set("spark.shuffle.io.maxRetries", "3")
.set("spark.shuffle.io.retryWait", "20")
//.set("spark.broadcast.blockSize","1024m")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//
.set("spark.scheduler.executorTaskBlacklistTime", "60000")
//
.set("spark.task.maxFailures", "10")
if(isLocal){
spark = SparkSession.builder.appName(appName)
.master("local")
//.enableHiveSupport()
.config(conf)
.getOrCreate()
}else{
spark = SparkSession.builder.appName(appName)
.enableHiveSupport()
.config(conf)
.getOrCreate()
}
spark
}
}
实现结果:
输入的数据格式:(数据量较少存储格式为text)
id | ip1 | ip2 | … |
---|---|---|---|
1 | 1.0.1.0 | 1.0.1.5 | … |
2 | 1.22.2.1 | 1.22.2.2 | … |
输出的数据格式:(因为一变多导致数据量增大,所以输出的存储格式转换为parquet)
id | ip | … |
---|---|---|
1 | 1.0.1.0 | … |
1 | 1.0.1.1 | … |
1 | 1.0.1.2 | … |
1 | 1.0.1.3 | … |
1 | 1.0.1.4 | … |
1 | 1.0.1.5 | … |
2 | 1.22.2.1 | … |
2 | 1.22.2.2 | … |
注意:
转换成case class ip_city是为了为将
DataFrame类型的数据指定schema信息。
最后
以上就是鳗鱼冬日为你收集整理的spark一行转多行操作并存储文件到hdfs为parquet格式的全部内容,希望文章能够帮你解决spark一行转多行操作并存储文件到hdfs为parquet格式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复