概述
样式对象
package com.dmp.beans
import com.dmp.utils.NumFormat
class Log(val sessionid: String,
val advertisersid: Int,
val adorderid: Int,
val adcreativeid: Int,
val adplatformproviderid: Int,
val sdkversion: String,
val adplatformkey: String,
val putinmodeltype: Int,
val requestmode: Int,
val adprice: Double,
val adppprice: Double,
val requestdate: String,
val ip: String,
val appid: String,
val appname: String,
val uuid: String,
val device: String,
val client: Int,
val osversion: String,
val density: String,
val pw: Int,
val ph: Int,
val long: String,
val lat: String,
val provincename: String,
val cityname: String,
val ispid: Int,
val ispname: String,
val networkmannerid: Int,
val networkmannername: String,
val iseffective: Int,
val isbilling: Int,
val adspacetype: Int,
val adspacetypename: String,
val devicetype: Int,
val processnode: Int,
val apptype: Int,
val district: String,
val paymode: Int,
val isbid: Int,
val bidprice: Double,
val winprice: Double,
val iswin: Int,
val cur: String,
val rate: Double,
val cnywinprice: Double,
val imei: String,
val mac: String,
val idfa: String,
val openudid: String,
val androidid: String,
val rtbprovince: String,
val rtbcity: String,
val rtbdistrict: String,
val rtbstreet: String,
val storeurl: String,
val realip: String,
val isqualityapp: Int,
val bidfloor: Double,
val aw: Int,
val ah: Int,
val imeimd5: String,
val macmd5: String,
val idfamd5: String,
val openudidmd5: String,
val androididmd5: String,
val imeisha1: String,
val macsha1: String,
val idfasha1: String,
val openudidsha1: String,
val androididsha1: String,
val uuidunknow: String,
val userid: String,
val iptype: Int,
val initbidprice: Double,
val adpayment: Double,
val agentrate: Double,
val lomarkrate: Double,
val adxrate: Double,
val title: String,
val keywords: String,
val tagid: String,
val callbackdate: String,
val channelid: String,
val mediatype: Int) extends Product with Serializable {
//角标和成员属性的映射关系
override def productElement(n: Int): Any = n match {
case 0 => sessionid
case 1 => advertisersid
case 2 => adorderid
case 3 => adcreativeid
case 4 => adplatformproviderid
case 5 => sdkversion
case 6 => adplatformkey
case 7 => putinmodeltype
case 8 => requestmode
case 9 => adprice
case 10 => adppprice
case 11 => requestdate
case 12 => ip
case 13 => appid
case 14 => appname
case 15 => uuid
case 16 => device
case 17 => client
case 18 => osversion
case 19 => density
case 20 => pw
case 21 => ph
case 22 => long
case 23 => lat
case 24 => provincename
case 25 => cityname
case 26 => ispid
case 27 => ispname
case 28 => networkmannerid
case 29 => networkmannername
case 30 => iseffective
case 31 => isbilling
case 32 => adspacetype
case 33 => adspacetypename
case 34 => devicetype
case 35 => processnode
case 36 => apptype
case 37 => district
case 38 => paymode
case 39 => isbid
case 40 => bidprice
case 41 => winprice
case 42 => iswin
case 43 => cur
case 44 => rate
case 45 => cnywinprice
case 46 => imei
case 47 => mac
case 48 => idfa
case 49 => openudid
case 50 => androidid
case 51 => rtbprovince
case 52 => rtbcity
case 53 => rtbdistrict
case 54 => rtbstreet
case 55 => storeurl
case 56 => realip
case 57 => isqualityapp
case 58 => bidfloor
case 59 => aw
case 60 => ah
case 61 => imeimd5
case 62 => macmd5
case 63 => idfamd5
case 64 => openudidmd5
case 65 => androididmd5
case 66 => imeisha1
case 67 => macsha1
case 68 => idfasha1
case 69 => openudidsha1
case 70 => androididsha1
case 71 => uuidunknow
case 72 => userid
case 73 => iptype
case 74 => initbidprice
case 75 => adpayment
case 76 => agentrate
case 77 => lomarkrate
case 78 => adxrate
case 79 => title
case 80 => keywords
case 81 => tagid
case 82 => callbackdate
case 83 => channelid
case 84 => mediatype
}
//对象有多少个成员属性
override def productArity: Int = 85
//比较两个对象是否是同一个对象
override def canEqual(that: Any): Boolean = that.isInstanceOf[Log]
}
//样例对象
object Log {
def apply(arr: Array[String]): Log = new Log(
arr(0),
NumFormat.toInt(arr(1)),
NumFormat.toInt(arr(2)),
NumFormat.toInt(arr(3)),
NumFormat.toInt(arr(4)),
arr(5),
arr(6),
NumFormat.toInt(arr(7)),
NumFormat.toInt(arr(8)),
NumFormat.toDouble(arr(9)),
NumFormat.toDouble(arr(10)),
arr(11),
arr(12),
arr(13),
arr(14),
arr(15),
arr(16),
NumFormat.toInt(arr(17)),
arr(18),
arr(19),
NumFormat.toInt(arr(20)),
NumFormat.toInt(arr(21)),
arr(22),
arr(23),
arr(24),
arr(25),
NumFormat.toInt(arr(26)),
arr(27),
NumFormat.toInt(arr(28)),
arr(29),
NumFormat.toInt(arr(30)),
NumFormat.toInt(arr(31)),
NumFormat.toInt(arr(32)),
arr(33),
NumFormat.toInt(arr(34)),
NumFormat.toInt(arr(35)),
NumFormat.toInt(arr(36)),
arr(37),
NumFormat.toInt(arr(38)),
NumFormat.toInt(arr(39)),
NumFormat.toDouble(arr(40)),
NumFormat.toDouble(arr(41)),
NumFormat.toInt(arr(42)),
arr(43),
NumFormat.toDouble(arr(44)),
NumFormat.toDouble(arr(45)),
arr(46),
arr(47),
arr(48),
arr(49),
arr(50),
arr(51),
arr(52),
arr(53),
arr(54),
arr(55),
arr(56),
NumFormat.toInt(arr(57)),
NumFormat.toDouble(arr(58)),
NumFormat.toInt(arr(59)),
NumFormat.toInt(arr(60)),
arr(61),
arr(62),
arr(63),
arr(64),
arr(65),
arr(66),
arr(67),
arr(68),
arr(69),
arr(70),
arr(71),
arr(72),
NumFormat.toInt(arr(73)),
NumFormat.toDouble(arr(74)),
NumFormat.toDouble(arr(75)),
NumFormat.toDouble(arr(76)),
NumFormat.toDouble(arr(77)),
NumFormat.toDouble(arr(78)),
arr(79),
arr(80),
arr(81),
arr(82),
arr(83),
NumFormat.toInt(arr(84))
)
}
主体类
package com.dmp.tools
import com.dmp.beans.Log
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/*
F:\牛牛学堂大数据24期\09-实训实战-9天\dmp&&移动项目\dmp\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2
snappy
C:\Users\admin\Desktop\result2
*/
//将原始日志文件转换成parquet文件格式,1.6版本默认gzip压缩,2.x版本默认snappy压缩
object Bzip2ParquetV2 {
def main(args: Array[String]): Unit = {
// 0 校验参数个数
if (args.length != 3) {
println(
"""
|cn.dmp.tools.Bzip2Parquet
|参数:
| logInputPath
| compressionCode <snappy, gzip, lzo>
| resultOutputPath
""".stripMargin //stripMargin 输出的时候换行对其
)
sys.exit()
}
// 1 接受程序参数 日志的输入,文件格式,输出路径
val Array(logInputPath, compressionCode, resultOutputPath) = args
val conf = new SparkConf()
conf.setAppName(s"${this.getClass.getSimpleName}")
.setMaster("local[*]")
//RDD 系列化到磁盘上,worker与worker之间的数据传输,如果集群中已经配置,无需多配置
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
//注册自定义类的序列化方式
conf.registerKryoClasses(Array(classOf[Log]))
val sQLContext = new SQLContext(sc)
sQLContext.setConf("spark.sql.parquet.compression.codec", compressionCode)
//3 读取日志数据
val rawdata = sc.textFile(logInputPath)
//4 根据业务需求对数据进行ETL
val dataLog = rawdata.map(line => line.split(",", -1))
.filter(_.length >= 85)
.map(arr => Log(arr)
)
//将结果保存到本地磁盘
val dataFrame = sQLContext.createDataFrame(dataLog)
//按照省份名称以及地市名称对数据进行分区
dataFrame.write.partitionBy("provincename","cityname").parquet(resultOutputPath)
sc.stop()
}
}
最后
以上就是现实白猫为你收集整理的自定义bean对象实现日志转换parquet的全部内容,希望文章能够帮你解决自定义bean对象实现日志转换parquet所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复