我是靠谱客的博主 明理小鸽子,最近开发中收集的这篇文章主要介绍日志文件完成数据清洗,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

有下面两条json格式的数据文件需要完成数据清洗

1593136280858|{"cm":{"ln":"-55.0","sv":"V2.9.6","os":"8.0.4","g":"C6816QZ0@gmail.com","mid":"489","nw":"3G","l":"es","vc":"4","hw":"640*960","ar":"MX","uid":"489","t":"1593123253541","la":"5.2","md":"sumsung-18","vn":"1.3.4","ba":"Sumsung","sr":"I"},"ap":"app","et":[{"ett":"1593050051366","en":"loading","kv":{"extend2":"","loading_time":"14","action":"3","extend1":"","type":"2","type1":"201","loading_way":"1"}},{"ett":"1593108791764","en":"ad","kv":{"activityId":"1","displayMills":"78522","entry":"1","action":"1","contentType":"0"}},{"ett":"1593111271266","en":"notification","kv":{"ap_time":"1593097087883","action":"1","type":"1","content":""}},{"ett":"1593066033562","en":"active_background","kv":{"active_source":"3"}},{"ett":"1593135644347","en":"comment","kv":{"p_comment_id":1,"addtime":"1593097573725","praise_count":973,"other_id":5,"comment_id":9,"reply_count":40,"userid":7,"content":"辑赤蹲慰鸽抿肘捎"}}]}
1593136280858|{"cm":{"ln":"-114.9","sv":"V2.7.8","os":"8.0.4","g":"NW0S962J@gmail.com","mid":"490","nw":"3G","l":"pt","vc":"8","hw":"640*1136","ar":"MX","uid":"490","t":"1593121224789","la":"-44.4","md":"Huawei-8","vn":"1.0.1","ba":"Huawei","sr":"O"},"ap":"app","et":[{"ett":"1593063223807","en":"loading","kv":{"extend2":"","loading_time":"0","action":"3","extend1":"","type":"1","type1":"102","loading_way":"1"}},{"ett":"1593095105466","en":"ad","kv":{"activityId":"1","displayMills":"1966","entry":"3","action":"2","contentType":"0"}},{"ett":"1593051718208","en":"notification","kv":{"ap_time":"1593095336265","action":"2","type":"3","content":""}},{"ett":"1593100021275","en":"comment","kv":{"p_comment_id":4,"addtime":"1593098946009","praise_count":220,"other_id":4,"comment_id":9,"reply_count":151,"userid":4,"content":"抄应螟皮釉倔掉汉蛋蕾街羡晶"}},{"ett":"1593105344120","en":"praise","kv":{"target_id":9,"id":7,"type":1,"add_time":"1593098545976","userid":8}}]}

首先百度json格式转换一下

1593136280858 | {
	"cm": {
		"ln": "-55.0",
		"sv": "V2.9.6",
		"os": "8.0.4",
		"g": "C6816QZ0@gmail.com",
		"mid": "489",
		"nw": "3G",
		"l": "es",
		"vc": "4",
		"hw": "640*960",
		"ar": "MX",
		"uid": "489",
		"t": "1593123253541",
		"la": "5.2",
		"md": "sumsung-18",
		"vn": "1.3.4",
		"ba": "Sumsung",
		"sr": "I"
	},
	"ap": "app",
	"et": [{
		"ett": "1593050051366",
		"en": "loading",
		"kv": {
			"extend2": "",
			"loading_time": "14",
			"action": "3",
			"extend1": "",
			"type": "2",
			"type1": "201",
			"loading_way": "1"
		}
	}, {
		"ett": "1593108791764",
		"en": "ad",
		"kv": {
			"activityId": "1",
			"displayMills": "78522",
			"entry": "1",
			"action": "1",
			"contentType": "0"
		}
	}, {
		"ett": "1593111271266",
		"en": "notification",
		"kv": {
			"ap_time": "1593097087883",
			"action": "1",
			"type": "1",
			"content": ""
		}
	}, {
		"ett": "1593066033562",
		"en": "active_background",
		"kv": {
			"active_source": "3"
		}
	}, {
		"ett": "1593135644347",
		"en": "comment",
		"kv": {
			"p_comment_id": 1,
			"addtime": "1593097573725",
			"praise_count": 973,
			"other_id": 5,
			"comment_id": 9,
			"reply_count": 40,
			"userid": 7,
			"content": "辑赤蹲慰鸽抿肘捎"
		}
	}]
}

将文件上传到hdfs上

[root@spark01 ~]# hdfs dfs -put /opt/kb09file/op.log  /kb09file

用spark读取文件:

scala> sc.textFile("file:///opt/kb09file/op.log")   //读取Linux系统文件
scala> val fileRDD = sc.textFile("hdfs://192.168.168.210:9000/kb09file/op.log")  //读取hdfs文件
//1分析日志格式
日志格式为:1593136280858 (用户标识) |  json字符串的形式     //分隔符为 | 管道符

//2使用split切割字符串
//注意这里应该使用单引号而不是双引号。单引号表示是字符,双引号表示是字符串
scala> val jsonStrRDD = fileRDD.map(x=>x.split('|')).map(x=>(x(0),x(1)))

//3如果想要保留用户标识,添加到json字符串中,进行字符串拼接:
scala> val jsonRdd = jsonStrRDD.map(x=>{ var jsonStr=x._2 ; jsonStr =  jsonStr.substring(0,jsonStr.length-1); jsonStr + ","id":"+  x._1 + "}" })

//RDD转成DataFrame
scala> val jsonDF=jsonRdd.toDF

jsonDF结果:

jsonDF.show()     //只有一个列,列名为value
+--------------------+
|               value|
+--------------------+
|{"cm":{"ln":"-55....|
|{"cm":{"ln":"-114...|
+--------------------+
//需要添加的包
import spark.implicits._   //隐式转换
import org.apache.spark.sql.functions._   //内置方法
import org.apache.spark.sql.types._    //类型
import org.apache.spark.sql._

//获取json字符串的值的方法:
scala> jsonDF.select( get_json_object($"value","$.cm").alias("cm") ).show(false)
//结果
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cm                                                                                                                                                                                                                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"ln":"-55.0","sv":"V2.9.6","os":"8.0.4","g":"C6816QZ0@gmail.com","mid":"489","nw":"3G","l":"es","vc":"4","hw":"640*960","ar":"MX","uid":"489","t":"1593123253541","la":"5.2","md":"sumsung-18","vn":"1.3.4","ba":"Sumsung","sr":"I"} |
|{"ln":"-114.9","sv":"V2.7.8","os":"8.0.4","g":"NW0S962J@gmail.com","mid":"490","nw":"3G","l":"pt","vc":"8","hw":"640*1136","ar":"MX","uid":"490","t":"1593121224789","la":"-44.4","md":"Huawei-8","vn":"1.0.1","ba":"Huawei","sr":"O"}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

日志文件共可以拆分成4个json字符串,每组字符串还可以继续拆分json格式
将json字符串{“cm”:“a1” , “ap”:“b1” , “et”:“c1” , “id”:“d1”} 结构化

//将json字符串{"cm":"a1" , "ap":"b1" , "et":"c1" , "id":"d1"} 结构化
//  表头    cm    ap    et    id
//   列     a1    b1    c1    d1
val jsonDF2: DataFrame = jsonDF.select(get_json_object($"value", "$.cm").alias("cm")
  , get_json_object($"value", "$.ap").alias("ap")
  , get_json_object($"value", "$.et").alias("et")
  , get_json_object($"value", "$.id").alias("id")
)
jsonDF2.select($"id", $"ap", $"cm", $"et").show()

结果

+-------------+---+--------------------+--------------------+
|           id| ap|                  cm|                  et|
+-------------+---+--------------------+--------------------+
|1593136280858|app|{"ln":"-55.0","sv...|[{"ett":"15930500...|
|1593136280858|app|{"ln":"-114.9","s...|[{"ett":"15930632...|
+-------------+---+--------------------+--------------------+

将cm继续拆分,把每个字段拆分成一个列:

val jsonDF3: DataFrame = jsonDF2.select($"id"
  , $"ap"
  , get_json_object($"cm", "$.ln").alias("ln")
  , get_json_object($"cm", "$.sv").alias("sv")
  , get_json_object($"cm", "$.os").alias("os")
  , get_json_object($"cm", "$.g").alias("g")
  , get_json_object($"cm", "$.mid").alias("mid")
  , get_json_object($"cm", "$.nw").alias("nw")
  , get_json_object($"cm", "$.l").alias("l")
  , get_json_object($"cm", "$.vc").alias("vc")
  , get_json_object($"cm", "$.hw").alias("hw")
  , get_json_object($"cm", "$.ar").alias("ar")
  , get_json_object($"cm", "$.uid").alias("uid")
  , get_json_object($"cm", "$.t").alias("t")
  , get_json_object($"cm", "$.la").alias("la")
  , get_json_object($"cm", "$.md").alias("md")
  , get_json_object($"cm", "$.vn").alias("vn")
  , get_json_object($"cm", "$.ba").alias("ba")
  , get_json_object($"cm", "$.sr").alias("sr")
  , $"et")
jsonDF3.show()

输出结果:

+-------------+---+------+------+-----+------------------+---+---+---+---+--------+---+---+-------------+-----+----------+-----+-------+---+--------------------+
|           id| ap|    ln|    sv|   os|                 g|mid| nw|  l| vc|      hw| ar|uid|            t|   la|        md|   vn|     ba| sr|                  et|
+-------------+---+------+------+-----+------------------+---+---+---+---+--------+---+---+-------------+-----+----------+-----+-------+---+--------------------+
|1593136280858|app| -55.0|V2.9.6|8.0.4|C6816QZ0@gmail.com|489| 3G| es|  4| 640*960| MX|489|1593123253541|  5.2|sumsung-18|1.3.4|Sumsung|  I|[{"ett":"15930500...|
|1593136280858|app|-114.9|V2.7.8|8.0.4|NW0S962J@gmail.com|490| 3G| pt|  8|640*1136| MX|490|1593121224789|-44.4|  Huawei-8|1.0.1| Huawei|  O|[{"ett":"15930632...|
+-------------+---+------+------+-----+------------------+---+---+---+---+--------+---+---+-------------+-----+----------+-----+-------+---+--------------------+

idea下的代码:

package nj.zb.kb09.project

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object OpLog {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("clearDemo").getOrCreate()
    import spark.implicits._
    val sc: SparkContext = spark.sparkContext
    val fileRDD: RDD[String] = sc.textFile("in/op.log")
    val jsonStrRDD: RDD[(String, String)] = fileRDD.map(x => x.split('|')).map(x => (x(0), x(1)))
    val jsonRdd: RDD[String] = jsonStrRDD
      .map(x => {
        var jsonStr = x._2
        jsonStr = jsonStr.substring(0, jsonStr.length - 1)
        jsonStr + ","id":" + x._1 + "}"
      })
    val jsonDF: DataFrame = jsonRdd.toDF
    jsonDF.show()

    import spark.implicits._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._

    jsonDF.select(get_json_object($"value", "$.cm").alias("cm")).show(false)

    val jsonDF2: DataFrame = jsonDF.select(get_json_object($"value", "$.cm").alias("cm")
      , get_json_object($"value", "$.ap").alias("ap")
      , get_json_object($"value", "$.et").alias("et")
      , get_json_object($"value", "$.id").alias("id")
    )
    jsonDF2.select($"id", $"ap", $"cm", $"et").show

    val jsonDF3: DataFrame = jsonDF2.select($"id"
      , $"ap"
      , get_json_object($"cm", "$.ln").alias("ln")
      , get_json_object($"cm", "$.sv").alias("sv")
      , get_json_object($"cm", "$.os").alias("os")
      , get_json_object($"cm", "$.g").alias("g")
      , get_json_object($"cm", "$.mid").alias("mid")
      , get_json_object($"cm", "$.nw").alias("nw")
      , get_json_object($"cm", "$.l").alias("l")
      , get_json_object($"cm", "$.vc").alias("vc")
      , get_json_object($"cm", "$.hw").alias("hw")
      , get_json_object($"cm", "$.ar").alias("ar")
      , get_json_object($"cm", "$.uid").alias("uid")
      , get_json_object($"cm", "$.t").alias("t")
      , get_json_object($"cm", "$.la").alias("la")
      , get_json_object($"cm", "$.md").alias("md")
      , get_json_object($"cm", "$.vn").alias("vn")
      , get_json_object($"cm", "$.ba").alias("ba")
      , get_json_object($"cm", "$.sr").alias("sr")
      , $"et")
    jsonDF3.show()

  }
}

下面是在黑界面操作步骤结果:
val fileRDD = sc.textFile(“file:///opt/kb09file/op.log”)

val jsonStrRDD = fileRDD.map(x=>x.split(’|’)).map(x=>(x(0),x(1)))

val jsonRDD = jsonStrRDD.map(x=>{var jsonStr=x._2; jsonStr = jsonStr.substring(0,jsonStr.length-1); jsonStr+",“id”:""+x._1+""}" })

val jsonDF = jsonRDD.toDF

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._

val jsonDF2 = jsonDF.select(get_json_object( " v a l u e " , " "value"," "value",".cm").alias(“cm”),get_json_object( " v a l u e " , " "value"," "value",".ap").alias(“ap”),get_json_object( " v a l u e " , " "value"," "value",".et").alias(“et”),get_json_object( " v a l u e " , " "value"," "value",".id").alias(“id”))

val jsonDF3 = jsonDF2.select( " i d " , "id", "id",“ap”,get_json_object( " c m " , " "cm"," "cm",".ln").alias(“ln”),get_json_object( " c m " , " "cm"," "cm",".sv").alias(“sv”),get_json_object( " c m " , " "cm"," "cm",".os").alias(“os”),get_json_object( " c m " , " "cm"," "cm",".g").alias(“g”),get_json_object( " c m " , " "cm"," "cm",".mid").alias(“mid”),get_json_object( " c m " , " "cm"," "cm",".nw").alias(“nw”),get_json_object( " c m " , " "cm"," "cm",".l").alias(“l”),get_json_object( " c m " , " "cm"," "cm",".vc").alias(“vc”),get_json_object( " c m " , " "cm"," "cm",".hw").alias(“hw”),get_json_object( " c m " , " "cm"," "cm",".ar").alias(“ar”),get_json_object( " c m " , " "cm"," "cm",".uid").alias(“uid”),get_json_object( " c m " , " "cm"," "cm",".t").alias(“t”),get_json_object( " c m " , " "cm"," "cm",".la").alias(“la”),get_json_object( " c m " , " "cm"," "cm",".md").alias(“md”),get_json_object( " c m " , " "cm"," "cm",".vn").alias(“vn”),get_json_object( " c m " , " "cm"," "cm",".ba").alias(“ba”),get_json_object( " c m " , " "cm"," "cm",".sr").alias(“sr”),$“et”)
在这里插入图片描述

val jsonDF4 = jsonDF3.select( " i d " , "id", "id",“ap”, " l n " , "ln", "ln",“sv”, " o s " , "os", "os",“g”, " m i d " , "mid", "mid",“nw”, " l " , "l", "l",“vc”, " h w " , "hw", "hw",“ar”, " u i d " , "uid", "uid",“t”, " l a " , "la", "la",“md”, " v n " , "vn", "vn",“ba”, " s r " , f r o m j s o n ( "sr",from_json( "sr",fromjson(“et”,ArrayType(StructType(StructField(“ett”,StringType)::StructField(“en”,StringType)::StructField(“kv”,StringType)::Nil))).alias(“event”))
在这里插入图片描述

val jsonDF5 = jsonDF4.select( " i d " , "id", "id",“ap”, " l n " , "ln", "ln",“sv”, " o s " , "os", "os",“g”, " m i d " , "mid", "mid",“nw”, " l " , "l", "l",“vc”, " h w " , "hw", "hw",“ar”, " u i d " , "uid", "uid",“t”, " l a " , "la", "la",“md”, " v n " , "vn", "vn",“ba”, " s r " , e x p l o d e ( "sr",explode( "sr",explode(“event”).alias(“event”))
在这里插入图片描述

val jsonDF6 = jsonDF5.select( " i d " , "id", "id",“ap”, " l n " , "ln", "ln",“sv”, " o s " , "os", "os",“g”, " m i d " , "mid", "mid",“nw”, " l " , "l", "l",“vc”, " h w " , "hw", "hw",“ar”, " u i d " , "uid", "uid",“t”, " l a " , "la", "la",“md”, " v n " , "vn", "vn",“ba”, " s r " , "sr", "sr",“event.ett”, " e v e n t . e n " , "event.en", "event.en",“event.kv”)
在这里插入图片描述

val jsonDF7=
jsonDF6.select( " i d " , "id", "id",“ap”, " l n " , "ln", "ln",“sv”, " o s " , "os", "os",“g”, " m i d " , "mid", "mid",“nw”, " l " , "l", "l",“vc”, " h w " , "hw", "hw",“ar”, " u i d " , "uid", "uid",“t”, " l a " , "la", "la",“md”, " v n " , "vn", "vn",“ba”, " s r " , "sr", "sr",“kv”)
在这里插入图片描述

val jsonDFloading =
jsonDF6.filter(x=>x.getAs(“en”)==“loading”).select( " e n " , g e t j s o n o b j e c t ( "en",get_json_object( "en",getjsonobject(“kv”," . e x t e n d 2 " ) . a l i a s ( " e x t e n d 2 " ) , g e t j s o n o b j e c t ( .extend2").alias("extend2"),get_json_object( .extend2").alias("extend2"),getjsonobject(“kv”," . l o a d i n g t i m e " ) . a l i a s ( " l o a d i n g t i m e " ) , g e t j s o n o b j e c t ( .loading_time").alias("loading_time"),get_json_object( .loadingtime").alias("loadingtime"),getjsonobject(“kv”," . a c t i o n " ) . a l i a s ( " a c t i o n " ) , g e t j s o n o b j e c t ( .action").alias("action"),get_json_object( .action").alias("action"),getjsonobject(“kv”," . e x t e n d 1 " ) . a l i a s ( " e x t e n d 1 " ) , g e t j s o n o b j e c t ( .extend1").alias("extend1"),get_json_object( .extend1").alias("extend1"),getjsonobject(“kv”," . t y p e " ) . a l i a s ( " t y p e " ) , g e t j s o n o b j e c t ( .type").alias("type"),get_json_object( .type").alias("type"),getjsonobject(“kv”," . t y p e 1 " ) . a l i a s ( " t y p e 1 " ) , g e t j s o n o b j e c t ( .type1").alias("type1"),get_json_object( .type1").alias("type1"),getjsonobject(“kv”,"$.loading_way").alias(“loading_way”))
在这里插入图片描述

val jsonDFad =
jsonDF6.filter(x=>x.getAs(“en”)==“ad”).select( " e n " , g e t j s o n o b j e c t ( "en",get_json_object( "en",getjsonobject(“kv”," . a c t i v i t y I d " ) . a l i a s ( " a c t i v i t y I d " ) , g e t j s o n o b j e c t ( .activityId").alias("activityId"),get_json_object( .activityId").alias("activityId"),getjsonobject(“kv”," . d i s p l a y M i l l s " ) . a l i a s ( " d i s p l a y M i l l s " ) , g e t j s o n o b j e c t ( .displayMills").alias("displayMills"),get_json_object( .displayMills").alias("displayMills"),getjsonobject(“kv”," . e n t r y " ) . a l i a s ( " e n t r y " ) , g e t j s o n o b j e c t ( .entry").alias("entry"),get_json_object( .entry").alias("entry"),getjsonobject(“kv”," . a c t i o n " ) . a l i a s ( " a c t i o n " ) , g e t j s o n o b j e c t ( .action").alias("action"),get_json_object( .action").alias("action"),getjsonobject(“kv”,"$.contentType").alias(“contentType”))
在这里插入图片描述

val jsonDFnotification =
jsonDF6.filter(x=>x.getAs(“en”)==“notification”).select( " e n " , g e t j s o n o b j e c t ( "en",get_json_object( "en",getjsonobject(“kv”," . a p t i m e " ) . a l i a s ( " a p t i m e " ) , g e t j s o n o b j e c t ( .ap_time").alias("ap_time"),get_json_object( .aptime").alias("aptime"),getjsonobject(“kv”," . a c t i o n " ) . a l i a s ( " a c t i o n " ) , g e t j s o n o b j e c t ( .action").alias("action"),get_json_object( .action").alias("action"),getjsonobject(“kv”," . t y p e " ) . a l i a s ( " t y p e " ) , g e t j s o n o b j e c t ( .type").alias("type"),get_json_object( .type").alias("type"),getjsonobject(“kv”,"$.content").alias(“content”))
在这里插入图片描述

val jsonDFcomment =
jsonDF6.filter(x=>x.getAs(“en”)==“comment”).select( " e n " , g e t j s o n o b j e c t ( "en",get_json_object( "en",getjsonobject(“kv”," . p c o m m e n t i d " ) . a l i a s ( " p c o m m e n t i d " ) , g e t j s o n o b j e c t ( .p_comment_id").alias("p_comment_id"),get_json_object( .pcommentid").alias("pcommentid"),getjsonobject(“kv”," . a d d t i m e " ) . a l i a s ( " a d d t i m e " ) , g e t j s o n o b j e c t ( .addtime").alias("addtime"),get_json_object( .addtime").alias("addtime"),getjsonobject(“kv”," . p r a i s e c o u n t " ) . a l i a s ( " p r a i s e c o u n t " ) , g e t j s o n o b j e c t ( .praise_count").alias("praise_count"),get_json_object( .praisecount").alias("praisecount"),getjsonobject(“kv”," . o t h e r i d " ) . a l i a s ( " o t h e r i d " ) , g e t j s o n o b j e c t ( .other_id").alias("other_id"),get_json_object( .otherid").alias("otherid"),getjsonobject(“kv”," . c o m m e n t i d " ) . a l i a s ( " c o m m e n t i d " ) , g e t j s o n o b j e c t ( .comment_id").alias("comment_id"),get_json_object( .commentid").alias("commentid"),getjsonobject(“kv”," . r e p l y c o u n t " ) . a l i a s ( " r e p l y c o u n t " ) , g e t j s o n o b j e c t ( .reply_count").alias("reply_count"),get_json_object( .replycount").alias("replycount"),getjsonobject(“kv”," . u s e r i d " ) . a l i a s ( " u s e r i d " ) , g e t j s o n o b j e c t ( .userid").alias("userid"),get_json_object( .userid").alias("userid"),getjsonobject(“kv”,"$.content").alias(“content”))

val jsonDFpraise =
jsonDF6.filter(x=>x.getAs(“en”)==“praise”).select( " e n " , g e t j s o n o b j e c t ( "en",get_json_object( "en",getjsonobject(“kv”," . t a r g e t i d " ) . a l i a s ( " t a r g e t i d " ) , g e t j s o n o b j e c t ( .target_id").alias("target_id"),get_json_object( .targetid").alias("targetid"),getjsonobject(“kv”," . i d " ) . a l i a s ( " i d " ) , g e t j s o n o b j e c t ( .id").alias("id"),get_json_object( .id").alias("id"),getjsonobject(“kv”," . t y p e " ) . a l i a s ( " t y p e " ) , g e t j s o n o b j e c t ( .type").alias("type"),get_json_object( .type").alias("type"),getjsonobject(“kv”," . a d d t i m e " ) . a l i a s ( " a d d t i m e " ) , g e t j s o n o b j e c t ( .add_time").alias("add_time"),get_json_object( .addtime").alias("addtime"),getjsonobject(“kv”,"$.userid").alias(“userid”))
在这里插入图片描述

最后

以上就是明理小鸽子为你收集整理的日志文件完成数据清洗的全部内容,希望文章能够帮你解决日志文件完成数据清洗所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(83)

评论列表共有 0 条评论

立即
投稿
返回
顶部