我是靠谱客的博主 负责柠檬,最近开发中收集的这篇文章主要介绍Spark多级目录输出,且输出不同值,重写MultipleTextOutputFormat,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
- 数据结构
www.alibaba.com 222.38.194.133 440887 [09/20/2019 14:33:52 +0800] 河北 秦皇岛 铁通 2019 09 20
www.baidu.com 171.13.146.53 654665 [09/20/2019 14:33:52 +0800] 河南 焦作 电信 2019 09 20
www.baidu.com 139.207.106.29 722052 [09/20/2019 14:33:52 +0800] 四川 电信 2019 09 20
www.alibaba.com 182.91.131.22 22103 [09/20/2019 14:33:52 +0800] 广西 桂林 联通 2019 09 20
www.huawei.com 171.12.12.85 155927 [09/20/2019 14:33:52 +0800] 河南 郑州 电信 2019 09 20
//"""多目录输出:依据不同的key输出不同的数据
// www.alibaba.com: ip,流量,省,地址
182.86.190.207 2271 江西 景德镇
// www.baidu.com:ip,流量,省,时间
182.86.190.207 2271 江西 [09/20/2019 14:33:52 +0800]
// www.huawei.com:ip,流量,省,地址,所属营业厅
182.86.190.207 2271 江西 景德镇 电信
//
//"""
代码实现
val dataRdd = sc.textFile("data/etl_access.log")
.filter(x=>x.split("t").length==10)
.map(x=> {
val splits = x.split("t")
val domain = splits(0)
val ip = splits(1)
val flow = splits(2)
val pro = splits(4)
val city = splits(5)
val isp = splits(6)
val year = splits(7)
val month = splits(8)
val day = splits(9)
(domain,(domain,ip,flow,pro,city,isp,year,month,day))
}).coalesce(1,true)
.saveAsHadoopFile(outpath,classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat] )
//注:saveAsHadoopFile 必须是key-value类型 ;saveAsTextFiled底层也是调用saveAsHadoopFile方法的
// 重写 MultipleTextOutputFormat 方法:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any]{
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
//已key作为分目录
val fliename = key+"/"+name
fliename
}
override def generateActualKey(key: Any, value: Any): Any = {
//输出数据 不需要key
NullWritable.get()
}
override def generateActualValue(key: Any, value: Any): Any = {
//根据不同的key ,输出不同的value 待优化--->可以从数据库获取想要输出的value
val splits = value.toString.split(",")
val sb = new StringBuffer
key match {
case "www.ruozedata.com" =>{
sb.append(splits(0)).append("t")
.append(splits(1)).append("t")
.append(splits(3))
sb
}
case "www.baidu.com" =>{
sb.append(splits(0)).append("t")
.append(splits(1)).append("t")
.append(splits(2))
sb
}
case "www.alibaba.com" =>{
sb.append(splits(0)).append("t")
.append(splits(2)).append("t")
.append(splits(3)).append("t")
.append(splits(4))
sb
}
case _ =>NullWritable.get()
}
}
}
最后
以上就是负责柠檬为你收集整理的Spark多级目录输出,且输出不同值,重写MultipleTextOutputFormat的全部内容,希望文章能够帮你解决Spark多级目录输出,且输出不同值,重写MultipleTextOutputFormat所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复