概述
我们都知道Parquet的基于列式存储的文件详情参看:https://blog.csdn.net/weixin_39043567/article/details/89874304
虽然log文件是有一定的规律,但是不够规整,我们可以规整后写入到parquet文件中,使用时直接spark sql就可以都进来进行相应的操作。
那么我们怎么把需要处理的log文件写成parquet文件呢?本文提供两种常用的简单方式:
1、采用Row类进行实现
object LogToParquet {
def main(args: Array[String]): Unit = {
//判断参数输入是否正确
if (args.length != 3) {
println("参数有误")
sys.exit()
}
//接收参数
val Array(logInputPath, compressionCode, resultOutputPath) = args
val conf = new SparkConf()
conf.setMaster("xx")
.setAppName("xx")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.parquet.compression.codec", compressionCode)
val sc = new SparkContext(conf)
val ss = new SparkSession()
//读取数据
val rowData: RDD[String] = sc.textFile("logInputPath")
val RowData: RDD[Row] = rowData.map(line => line.split(",")) //返回RDD[Array[String]]
.filter(_.length >= 85)
.map(arr => {
//=>每一个数组中的内内容都映射成Row
将Row关联到schema就可以创建df了
Row(
arr(0)
arr (1)
//...根据需要
)
})
//创建schema
也可以单独写一个util类封装
val schema = StructType(List(
StructField("属性名0", 类型)
StructField("属性名1", 类型)
//....
))
//创建带有df,并存储结果parquet
val dataFrame: DataFrame = ss.createDataFrame(RowData, schema)
dataFrame.write.parquet("resultOutputPath")
sc.stop()
}
}
2、采用自定义类的方式实现
注意自定义类需要设置序列化方式,以及哪个类需要序列化,否则运行时会报序列化相关的错。
object LogToParquetV2 {
def main(args: Array[String]): Unit = {
if(args.length!=3){
println("参数有误")
sys.exit()
}
//接收参数args(0)args(1)args(2)
val Array(logInputPath, compressionCode,resultOutputPath)=args
val conf = new SparkConf()
conf.setMaster("")
.setAppName("")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")//设置自定义类的序列化方式
.set("spark.sql.parquet.compression.codec", compressionCode)
conf.registerKryoClasses(Array(classOf[Log]))//refister需要序列化的类型
val sc = new SparkContext(conf)
val ss = new SparkSession()
//读取数据
val rowData: RDD[String] = sc.textFile("logInputPath")
val logRdd: RDD[Log] = rowData.map(_.split(",", -1)).filter(_.length >= 85)
.map(arr => Log(arr))
val df: DataFrame = ss.createDataFrame(logRdd)
//coalesce合并partition,
df.coalesce(1).write.partitionBy("provincename","cityname").parquet("resultOutputPath")
}
}
Log类:
class Log(
val xxxxid: String,
val yyyyyid: Int,
) extends Product with Serializable{
override def productElement(n: Int): Any = n match {
case 0 => xxxxid
case 1 => yyyyyid
}
override def productArity: Int = 20
override def canEqual(that: Any): Boolean = that.isInstanceOf[Log]
}
object Log{
def apply(arr:Array[String]):Log=new Log(
arr(0)
arr(1)
//注意需要转成相应的类型。
)
}
最后
以上就是苹果外套为你收集整理的Spark处理Log文件写成Parquet文件的两种方式的全部内容,希望文章能够帮你解决Spark处理Log文件写成Parquet文件的两种方式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复