我是靠谱客的博主 负责柠檬,最近开发中收集的这篇文章主要介绍Spark多级目录输出,且输出不同值,重写MultipleTextOutputFormat,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  • 数据结构
www.alibaba.com	222.38.194.133	440887	[09/20/2019 14:33:52 +0800]	河北	秦皇岛 铁通	2019	09	20
www.baidu.com	171.13.146.53	654665	[09/20/2019 14:33:52 +0800]	河南	焦作	电信	2019	09	20
www.baidu.com	139.207.106.29	722052	[09/20/2019 14:33:52 +0800]	四川		电信	2019	09	20
www.alibaba.com	182.91.131.22	22103	[09/20/2019 14:33:52 +0800]	广西	桂林	联通	2019	09	20
www.huawei.com	171.12.12.85	155927	[09/20/2019 14:33:52 +0800]	河南	郑州	电信	2019	09	20
//"""多目录输出:依据不同的key输出不同的数据
//  www.alibaba.com: ip,流量,省,地址
		182.86.190.207  2271 江西  景德镇
//  www.baidu.com:ip,流量,省,时间
		182.86.190.207  2271 江西 [09/20/2019 14:33:52 +0800]
//  www.huawei.com:ip,流量,省,地址,所属营业厅
		182.86.190.207  2271 江西  景德镇 电信	
//  
//"""
代码实现 
val dataRdd = sc.textFile("data/etl_access.log")
      .filter(x=>x.split("t").length==10)
      .map(x=> {
        val splits = x.split("t")
          val domain = splits(0)
          val ip = splits(1)
          val flow = splits(2)
          val pro = splits(4)
          val city = splits(5)
          val isp = splits(6)
          val year = splits(7)
          val month = splits(8)
          val day = splits(9)
        (domain,(domain,ip,flow,pro,city,isp,year,month,day))
      }).coalesce(1,true)
      .saveAsHadoopFile(outpath,classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat] )
 //注:saveAsHadoopFile 必须是key-value类型 ;saveAsTextFiled底层也是调用saveAsHadoopFile方法的
 
// 重写 MultipleTextOutputFormat 方法:

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any]{
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
      //已key作为分目录
      val fliename = key+"/"+name
      fliename
    }

    override def generateActualKey(key: Any, value: Any): Any = {
      //输出数据 不需要key
      NullWritable.get()
    }

    override def generateActualValue(key: Any, value: Any): Any = {
      //根据不同的key ,输出不同的value 待优化--->可以从数据库获取想要输出的value
      val splits = value.toString.split(",")
      val sb = new StringBuffer
      key match {
        case "www.ruozedata.com" =>{
          sb.append(splits(0)).append("t")
            .append(splits(1)).append("t")
            .append(splits(3))
          sb
        }
        case "www.baidu.com" =>{
          sb.append(splits(0)).append("t")
            .append(splits(1)).append("t")
            .append(splits(2))
          sb
        }
        case "www.alibaba.com" =>{
          sb.append(splits(0)).append("t")
            .append(splits(2)).append("t")
            .append(splits(3)).append("t")
            .append(splits(4))
          sb
        }
        case _ =>NullWritable.get()
      }
    }

  }

最后

以上就是负责柠檬为你收集整理的Spark多级目录输出,且输出不同值,重写MultipleTextOutputFormat的全部内容,希望文章能够帮你解决Spark多级目录输出,且输出不同值,重写MultipleTextOutputFormat所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(40)

评论列表共有 0 条评论

立即
投稿
返回
顶部