我是靠谱客的博主 现实白猫,最近开发中收集的这篇文章主要介绍自定义bean对象实现日志转换parquet,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

样式对象

package com.dmp.beans
import com.dmp.utils.NumFormat
class Log(val sessionid: String,
          val advertisersid: Int,
          val adorderid: Int,
          val adcreativeid: Int,
          val adplatformproviderid: Int,
          val sdkversion: String,
          val adplatformkey: String,
          val putinmodeltype: Int,
          val requestmode: Int,
          val adprice: Double,
          val adppprice: Double,
          val requestdate: String,
          val ip: String,
          val appid: String,
          val appname: String,
          val uuid: String,
          val device: String,
          val client: Int,
          val osversion: String,
          val density: String,
          val pw: Int,
          val ph: Int,
          val long: String,
          val lat: String,
          val provincename: String,
          val cityname: String,
          val ispid: Int,
          val ispname: String,
          val networkmannerid: Int,
          val networkmannername: String,
          val iseffective: Int,
          val isbilling: Int,
          val adspacetype: Int,
          val adspacetypename: String,
          val devicetype: Int,
          val processnode: Int,
          val apptype: Int,
          val district: String,
          val paymode: Int,
          val isbid: Int,
          val bidprice: Double,
          val winprice: Double,
          val iswin: Int,
          val cur: String,
          val rate: Double,
          val cnywinprice: Double,
          val imei: String,
          val mac: String,
          val idfa: String,
          val openudid: String,
          val androidid: String,
          val rtbprovince: String,
          val rtbcity: String,
          val rtbdistrict: String,
          val rtbstreet: String,
          val storeurl: String,
          val realip: String,
          val isqualityapp: Int,
          val bidfloor: Double,
          val aw: Int,
          val ah: Int,
          val imeimd5: String,
          val macmd5: String,
          val idfamd5: String,
          val openudidmd5: String,
          val androididmd5: String,
          val imeisha1: String,
          val macsha1: String,
          val idfasha1: String,
          val openudidsha1: String,
          val androididsha1: String,
          val uuidunknow: String,
          val userid: String,
          val iptype: Int,
          val initbidprice: Double,
          val adpayment: Double,
          val agentrate: Double,
          val lomarkrate: Double,
          val adxrate: Double,
          val title: String,
          val keywords: String,
          val tagid: String,
          val callbackdate: String,
          val channelid: String,
          val mediatype: Int) extends Product with Serializable {
  //角标和成员属性的映射关系
  override def productElement(n: Int): Any = n match {
    case 0 => sessionid
    case 1 => advertisersid
    case 2 => adorderid
    case 3 => adcreativeid
    case 4 => adplatformproviderid
    case 5 => sdkversion
    case 6 => adplatformkey
    case 7 => putinmodeltype
    case 8 => requestmode
    case 9 => adprice
    case 10 => adppprice
    case 11 => requestdate
    case 12 => ip
    case 13 => appid
    case 14 => appname
    case 15 => uuid
    case 16 => device
    case 17 => client
    case 18 => osversion
    case 19 => density
    case 20 => pw
    case 21 => ph
    case 22 => long
    case 23 => lat
    case 24 => provincename
    case 25 => cityname
    case 26 => ispid
    case 27 => ispname
    case 28 => networkmannerid
    case 29 => networkmannername
    case 30 => iseffective
    case 31 => isbilling
    case 32 => adspacetype
    case 33 => adspacetypename
    case 34 => devicetype
    case 35 => processnode
    case 36 => apptype
    case 37 => district
    case 38 => paymode
    case 39 => isbid
    case 40 => bidprice
    case 41 => winprice
    case 42 => iswin
    case 43 => cur
    case 44 => rate
    case 45 => cnywinprice
    case 46 => imei
    case 47 => mac
    case 48 => idfa
    case 49 => openudid
    case 50 => androidid
    case 51 => rtbprovince
    case 52 => rtbcity
    case 53 => rtbdistrict
    case 54 => rtbstreet
    case 55 => storeurl
    case 56 => realip
    case 57 => isqualityapp
    case 58 => bidfloor
    case 59 => aw
    case 60 => ah
    case 61 => imeimd5
    case 62 => macmd5
    case 63 => idfamd5
    case 64 => openudidmd5
    case 65 => androididmd5
    case 66 => imeisha1
    case 67 => macsha1
    case 68 => idfasha1
    case 69 => openudidsha1
    case 70 => androididsha1
    case 71 => uuidunknow
    case 72 => userid
    case 73 => iptype
    case 74 => initbidprice
    case 75 => adpayment
    case 76 => agentrate
    case 77 => lomarkrate
    case 78 => adxrate
    case 79 => title
    case 80 => keywords
    case 81 => tagid
    case 82 => callbackdate
    case 83 => channelid
    case 84 => mediatype
  }

  //对象有多少个成员属性
  override def productArity: Int = 85

  //比较两个对象是否是同一个对象
  override def canEqual(that: Any): Boolean = that.isInstanceOf[Log]
}

//样例对象
object Log {
  def apply(arr: Array[String]): Log = new Log(
    arr(0),
    NumFormat.toInt(arr(1)),
    NumFormat.toInt(arr(2)),
    NumFormat.toInt(arr(3)),
    NumFormat.toInt(arr(4)),
    arr(5),
    arr(6),
    NumFormat.toInt(arr(7)),
    NumFormat.toInt(arr(8)),
    NumFormat.toDouble(arr(9)),
    NumFormat.toDouble(arr(10)),
    arr(11),
    arr(12),
    arr(13),
    arr(14),
    arr(15),
    arr(16),
    NumFormat.toInt(arr(17)),
    arr(18),
    arr(19),
    NumFormat.toInt(arr(20)),
    NumFormat.toInt(arr(21)),
    arr(22),
    arr(23),
    arr(24),
    arr(25),
    NumFormat.toInt(arr(26)),
    arr(27),
    NumFormat.toInt(arr(28)),
    arr(29),
    NumFormat.toInt(arr(30)),
    NumFormat.toInt(arr(31)),
    NumFormat.toInt(arr(32)),
    arr(33),
    NumFormat.toInt(arr(34)),
    NumFormat.toInt(arr(35)),
    NumFormat.toInt(arr(36)),
    arr(37),
    NumFormat.toInt(arr(38)),
    NumFormat.toInt(arr(39)),
    NumFormat.toDouble(arr(40)),
    NumFormat.toDouble(arr(41)),
    NumFormat.toInt(arr(42)),
    arr(43),
    NumFormat.toDouble(arr(44)),
    NumFormat.toDouble(arr(45)),
    arr(46),
    arr(47),
    arr(48),
    arr(49),
    arr(50),
    arr(51),
    arr(52),
    arr(53),
    arr(54),
    arr(55),
    arr(56),
    NumFormat.toInt(arr(57)),
    NumFormat.toDouble(arr(58)),
    NumFormat.toInt(arr(59)),
    NumFormat.toInt(arr(60)),
    arr(61),
    arr(62),
    arr(63),
    arr(64),
    arr(65),
    arr(66),
    arr(67),
    arr(68),
    arr(69),
    arr(70),
    arr(71),
    arr(72),
    NumFormat.toInt(arr(73)),
    NumFormat.toDouble(arr(74)),
    NumFormat.toDouble(arr(75)),
    NumFormat.toDouble(arr(76)),
    NumFormat.toDouble(arr(77)),
    NumFormat.toDouble(arr(78)),
    arr(79),
    arr(80),
    arr(81),
    arr(82),
    arr(83),
    NumFormat.toInt(arr(84))
  )
}

主体类

package com.dmp.tools

import com.dmp.beans.Log
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
F:\牛牛学堂大数据24期\09-实训实战-9天\dmp&&移动项目\dmp\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2
snappy
C:\Users\admin\Desktop\result2
 */
//将原始日志文件转换成parquet文件格式,1.6版本默认gzip压缩,2.x版本默认snappy压缩
object Bzip2ParquetV2 {

  def main(args: Array[String]): Unit = {
    //   0 校验参数个数
    if (args.length != 3) {
      println(
        """
          |cn.dmp.tools.Bzip2Parquet
          |参数:
          | logInputPath
          | compressionCode <snappy, gzip, lzo>
          | resultOutputPath
        """.stripMargin //stripMargin 输出的时候换行对其
      )
      sys.exit()
    }

    // 1 接受程序参数 日志的输入,文件格式,输出路径
    val Array(logInputPath, compressionCode, resultOutputPath) = args

    val conf = new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}")
      .setMaster("local[*]")
      //RDD 系列化到磁盘上,worker与worker之间的数据传输,如果集群中已经配置,无需多配置
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    //注册自定义类的序列化方式
    conf.registerKryoClasses(Array(classOf[Log]))
    val sQLContext = new SQLContext(sc)
    sQLContext.setConf("spark.sql.parquet.compression.codec", compressionCode)

    //3 读取日志数据
    val rawdata = sc.textFile(logInputPath)
    //4 根据业务需求对数据进行ETL
    val dataLog = rawdata.map(line => line.split(",", -1))
      .filter(_.length >= 85)
      .map(arr => Log(arr)
      )


    //将结果保存到本地磁盘
    val dataFrame = sQLContext.createDataFrame(dataLog)
    //按照省份名称以及地市名称对数据进行分区
    dataFrame.write.partitionBy("provincename","cityname").parquet(resultOutputPath)
    sc.stop()
}
}

最后

以上就是现实白猫为你收集整理的自定义bean对象实现日志转换parquet的全部内容,希望文章能够帮你解决自定义bean对象实现日志转换parquet所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部