概述
有下面两条json格式的数据文件需要完成数据清洗
1593136280858|{"cm":{"ln":"-55.0","sv":"V2.9.6","os":"8.0.4","g":"C6816QZ0@gmail.com","mid":"489","nw":"3G","l":"es","vc":"4","hw":"640*960","ar":"MX","uid":"489","t":"1593123253541","la":"5.2","md":"sumsung-18","vn":"1.3.4","ba":"Sumsung","sr":"I"},"ap":"app","et":[{"ett":"1593050051366","en":"loading","kv":{"extend2":"","loading_time":"14","action":"3","extend1":"","type":"2","type1":"201","loading_way":"1"}},{"ett":"1593108791764","en":"ad","kv":{"activityId":"1","displayMills":"78522","entry":"1","action":"1","contentType":"0"}},{"ett":"1593111271266","en":"notification","kv":{"ap_time":"1593097087883","action":"1","type":"1","content":""}},{"ett":"1593066033562","en":"active_background","kv":{"active_source":"3"}},{"ett":"1593135644347","en":"comment","kv":{"p_comment_id":1,"addtime":"1593097573725","praise_count":973,"other_id":5,"comment_id":9,"reply_count":40,"userid":7,"content":"辑赤蹲慰鸽抿肘捎"}}]}
1593136280858|{"cm":{"ln":"-114.9","sv":"V2.7.8","os":"8.0.4","g":"NW0S962J@gmail.com","mid":"490","nw":"3G","l":"pt","vc":"8","hw":"640*1136","ar":"MX","uid":"490","t":"1593121224789","la":"-44.4","md":"Huawei-8","vn":"1.0.1","ba":"Huawei","sr":"O"},"ap":"app","et":[{"ett":"1593063223807","en":"loading","kv":{"extend2":"","loading_time":"0","action":"3","extend1":"","type":"1","type1":"102","loading_way":"1"}},{"ett":"1593095105466","en":"ad","kv":{"activityId":"1","displayMills":"1966","entry":"3","action":"2","contentType":"0"}},{"ett":"1593051718208","en":"notification","kv":{"ap_time":"1593095336265","action":"2","type":"3","content":""}},{"ett":"1593100021275","en":"comment","kv":{"p_comment_id":4,"addtime":"1593098946009","praise_count":220,"other_id":4,"comment_id":9,"reply_count":151,"userid":4,"content":"抄应螟皮釉倔掉汉蛋蕾街羡晶"}},{"ett":"1593105344120","en":"praise","kv":{"target_id":9,"id":7,"type":1,"add_time":"1593098545976","userid":8}}]}
首先百度json格式转换一下
1593136280858 | {
"cm": {
"ln": "-55.0",
"sv": "V2.9.6",
"os": "8.0.4",
"g": "C6816QZ0@gmail.com",
"mid": "489",
"nw": "3G",
"l": "es",
"vc": "4",
"hw": "640*960",
"ar": "MX",
"uid": "489",
"t": "1593123253541",
"la": "5.2",
"md": "sumsung-18",
"vn": "1.3.4",
"ba": "Sumsung",
"sr": "I"
},
"ap": "app",
"et": [{
"ett": "1593050051366",
"en": "loading",
"kv": {
"extend2": "",
"loading_time": "14",
"action": "3",
"extend1": "",
"type": "2",
"type1": "201",
"loading_way": "1"
}
}, {
"ett": "1593108791764",
"en": "ad",
"kv": {
"activityId": "1",
"displayMills": "78522",
"entry": "1",
"action": "1",
"contentType": "0"
}
}, {
"ett": "1593111271266",
"en": "notification",
"kv": {
"ap_time": "1593097087883",
"action": "1",
"type": "1",
"content": ""
}
}, {
"ett": "1593066033562",
"en": "active_background",
"kv": {
"active_source": "3"
}
}, {
"ett": "1593135644347",
"en": "comment",
"kv": {
"p_comment_id": 1,
"addtime": "1593097573725",
"praise_count": 973,
"other_id": 5,
"comment_id": 9,
"reply_count": 40,
"userid": 7,
"content": "辑赤蹲慰鸽抿肘捎"
}
}]
}
将文件上传到hdfs上
[root@spark01 ~]# hdfs dfs -put /opt/kb09file/op.log /kb09file
用spark读取文件:
scala> sc.textFile("file:///opt/kb09file/op.log") //读取Linux系统文件
scala> val fileRDD = sc.textFile("hdfs://192.168.168.210:9000/kb09file/op.log") //读取hdfs文件
//1分析日志格式
日志格式为:1593136280858 (用户标识) | json字符串的形式 //分隔符为 | 管道符
//2使用split切割字符串
//注意这里应该使用单引号而不是双引号。单引号表示是字符,双引号表示是字符串
scala> val jsonStrRDD = fileRDD.map(x=>x.split('|')).map(x=>(x(0),x(1)))
//3如果想要保留用户标识,添加到json字符串中,进行字符串拼接:
scala> val jsonRdd = jsonStrRDD.map(x=>{ var jsonStr=x._2 ; jsonStr = jsonStr.substring(0,jsonStr.length-1); jsonStr + ","id":"+ x._1 + "}" })
//RDD转成DataFrame
scala> val jsonDF=jsonRdd.toDF
jsonDF结果:
jsonDF.show() //只有一个列,列名为value
+--------------------+
| value|
+--------------------+
|{"cm":{"ln":"-55....|
|{"cm":{"ln":"-114...|
+--------------------+
//需要添加的包
import spark.implicits._ //隐式转换
import org.apache.spark.sql.functions._ //内置方法
import org.apache.spark.sql.types._ //类型
import org.apache.spark.sql._
//获取json字符串的值的方法:
scala> jsonDF.select( get_json_object($"value","$.cm").alias("cm") ).show(false)
//结果
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cm |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"ln":"-55.0","sv":"V2.9.6","os":"8.0.4","g":"C6816QZ0@gmail.com","mid":"489","nw":"3G","l":"es","vc":"4","hw":"640*960","ar":"MX","uid":"489","t":"1593123253541","la":"5.2","md":"sumsung-18","vn":"1.3.4","ba":"Sumsung","sr":"I"} |
|{"ln":"-114.9","sv":"V2.7.8","os":"8.0.4","g":"NW0S962J@gmail.com","mid":"490","nw":"3G","l":"pt","vc":"8","hw":"640*1136","ar":"MX","uid":"490","t":"1593121224789","la":"-44.4","md":"Huawei-8","vn":"1.0.1","ba":"Huawei","sr":"O"}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
日志文件共可以拆分成4个json字符串,每组字符串还可以继续拆分json格式
将json字符串{“cm”:“a1” , “ap”:“b1” , “et”:“c1” , “id”:“d1”} 结构化
//将json字符串{"cm":"a1" , "ap":"b1" , "et":"c1" , "id":"d1"} 结构化
// 表头 cm ap et id
// 列 a1 b1 c1 d1
val jsonDF2: DataFrame = jsonDF.select(get_json_object($"value", "$.cm").alias("cm")
, get_json_object($"value", "$.ap").alias("ap")
, get_json_object($"value", "$.et").alias("et")
, get_json_object($"value", "$.id").alias("id")
)
jsonDF2.select($"id", $"ap", $"cm", $"et").show()
结果
+-------------+---+--------------------+--------------------+
| id| ap| cm| et|
+-------------+---+--------------------+--------------------+
|1593136280858|app|{"ln":"-55.0","sv...|[{"ett":"15930500...|
|1593136280858|app|{"ln":"-114.9","s...|[{"ett":"15930632...|
+-------------+---+--------------------+--------------------+
将cm继续拆分,把每个字段拆分成一个列:
val jsonDF3: DataFrame = jsonDF2.select($"id"
, $"ap"
, get_json_object($"cm", "$.ln").alias("ln")
, get_json_object($"cm", "$.sv").alias("sv")
, get_json_object($"cm", "$.os").alias("os")
, get_json_object($"cm", "$.g").alias("g")
, get_json_object($"cm", "$.mid").alias("mid")
, get_json_object($"cm", "$.nw").alias("nw")
, get_json_object($"cm", "$.l").alias("l")
, get_json_object($"cm", "$.vc").alias("vc")
, get_json_object($"cm", "$.hw").alias("hw")
, get_json_object($"cm", "$.ar").alias("ar")
, get_json_object($"cm", "$.uid").alias("uid")
, get_json_object($"cm", "$.t").alias("t")
, get_json_object($"cm", "$.la").alias("la")
, get_json_object($"cm", "$.md").alias("md")
, get_json_object($"cm", "$.vn").alias("vn")
, get_json_object($"cm", "$.ba").alias("ba")
, get_json_object($"cm", "$.sr").alias("sr")
, $"et")
jsonDF3.show()
输出结果:
+-------------+---+------+------+-----+------------------+---+---+---+---+--------+---+---+-------------+-----+----------+-----+-------+---+--------------------+
| id| ap| ln| sv| os| g|mid| nw| l| vc| hw| ar|uid| t| la| md| vn| ba| sr| et|
+-------------+---+------+------+-----+------------------+---+---+---+---+--------+---+---+-------------+-----+----------+-----+-------+---+--------------------+
|1593136280858|app| -55.0|V2.9.6|8.0.4|C6816QZ0@gmail.com|489| 3G| es| 4| 640*960| MX|489|1593123253541| 5.2|sumsung-18|1.3.4|Sumsung| I|[{"ett":"15930500...|
|1593136280858|app|-114.9|V2.7.8|8.0.4|NW0S962J@gmail.com|490| 3G| pt| 8|640*1136| MX|490|1593121224789|-44.4| Huawei-8|1.0.1| Huawei| O|[{"ett":"15930632...|
+-------------+---+------+------+-----+------------------+---+---+---+---+--------+---+---+-------------+-----+----------+-----+-------+---+--------------------+
idea下的代码:
package nj.zb.kb09.project
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object OpLog {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("clearDemo").getOrCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("in/op.log")
val jsonStrRDD: RDD[(String, String)] = fileRDD.map(x => x.split('|')).map(x => (x(0), x(1)))
val jsonRdd: RDD[String] = jsonStrRDD
.map(x => {
var jsonStr = x._2
jsonStr = jsonStr.substring(0, jsonStr.length - 1)
jsonStr + ","id":" + x._1 + "}"
})
val jsonDF: DataFrame = jsonRdd.toDF
jsonDF.show()
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
jsonDF.select(get_json_object($"value", "$.cm").alias("cm")).show(false)
val jsonDF2: DataFrame = jsonDF.select(get_json_object($"value", "$.cm").alias("cm")
, get_json_object($"value", "$.ap").alias("ap")
, get_json_object($"value", "$.et").alias("et")
, get_json_object($"value", "$.id").alias("id")
)
jsonDF2.select($"id", $"ap", $"cm", $"et").show
val jsonDF3: DataFrame = jsonDF2.select($"id"
, $"ap"
, get_json_object($"cm", "$.ln").alias("ln")
, get_json_object($"cm", "$.sv").alias("sv")
, get_json_object($"cm", "$.os").alias("os")
, get_json_object($"cm", "$.g").alias("g")
, get_json_object($"cm", "$.mid").alias("mid")
, get_json_object($"cm", "$.nw").alias("nw")
, get_json_object($"cm", "$.l").alias("l")
, get_json_object($"cm", "$.vc").alias("vc")
, get_json_object($"cm", "$.hw").alias("hw")
, get_json_object($"cm", "$.ar").alias("ar")
, get_json_object($"cm", "$.uid").alias("uid")
, get_json_object($"cm", "$.t").alias("t")
, get_json_object($"cm", "$.la").alias("la")
, get_json_object($"cm", "$.md").alias("md")
, get_json_object($"cm", "$.vn").alias("vn")
, get_json_object($"cm", "$.ba").alias("ba")
, get_json_object($"cm", "$.sr").alias("sr")
, $"et")
jsonDF3.show()
}
}
下面是在黑界面操作步骤结果:
val fileRDD = sc.textFile(“file:///opt/kb09file/op.log”)
val jsonStrRDD = fileRDD.map(x=>x.split(’|’)).map(x=>(x(0),x(1)))
val jsonRDD = jsonStrRDD.map(x=>{var jsonStr=x._2; jsonStr = jsonStr.substring(0,jsonStr.length-1); jsonStr+",“id”:""+x._1+""}" })
val jsonDF = jsonRDD.toDF
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
val jsonDF2 = jsonDF.select(get_json_object( " v a l u e " , " "value"," "value",".cm").alias(“cm”),get_json_object( " v a l u e " , " "value"," "value",".ap").alias(“ap”),get_json_object( " v a l u e " , " "value"," "value",".et").alias(“et”),get_json_object( " v a l u e " , " "value"," "value",".id").alias(“id”))
val jsonDF3 = jsonDF2.select(
"
i
d
"
,
"id",
"id",“ap”,get_json_object(
"
c
m
"
,
"
"cm","
"cm",".ln").alias(“ln”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".sv").alias(“sv”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".os").alias(“os”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".g").alias(“g”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".mid").alias(“mid”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".nw").alias(“nw”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".l").alias(“l”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".vc").alias(“vc”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".hw").alias(“hw”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".ar").alias(“ar”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".uid").alias(“uid”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".t").alias(“t”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".la").alias(“la”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".md").alias(“md”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".vn").alias(“vn”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".ba").alias(“ba”),get_json_object(
"
c
m
"
,
"
"cm","
"cm",".sr").alias(“sr”),$“et”)
val jsonDF4 = jsonDF3.select(
"
i
d
"
,
"id",
"id",“ap”,
"
l
n
"
,
"ln",
"ln",“sv”,
"
o
s
"
,
"os",
"os",“g”,
"
m
i
d
"
,
"mid",
"mid",“nw”,
"
l
"
,
"l",
"l",“vc”,
"
h
w
"
,
"hw",
"hw",“ar”,
"
u
i
d
"
,
"uid",
"uid",“t”,
"
l
a
"
,
"la",
"la",“md”,
"
v
n
"
,
"vn",
"vn",“ba”,
"
s
r
"
,
f
r
o
m
j
s
o
n
(
"sr",from_json(
"sr",fromjson(“et”,ArrayType(StructType(StructField(“ett”,StringType)::StructField(“en”,StringType)::StructField(“kv”,StringType)::Nil))).alias(“event”))
val jsonDF5 = jsonDF4.select(
"
i
d
"
,
"id",
"id",“ap”,
"
l
n
"
,
"ln",
"ln",“sv”,
"
o
s
"
,
"os",
"os",“g”,
"
m
i
d
"
,
"mid",
"mid",“nw”,
"
l
"
,
"l",
"l",“vc”,
"
h
w
"
,
"hw",
"hw",“ar”,
"
u
i
d
"
,
"uid",
"uid",“t”,
"
l
a
"
,
"la",
"la",“md”,
"
v
n
"
,
"vn",
"vn",“ba”,
"
s
r
"
,
e
x
p
l
o
d
e
(
"sr",explode(
"sr",explode(“event”).alias(“event”))
val jsonDF6 = jsonDF5.select(
"
i
d
"
,
"id",
"id",“ap”,
"
l
n
"
,
"ln",
"ln",“sv”,
"
o
s
"
,
"os",
"os",“g”,
"
m
i
d
"
,
"mid",
"mid",“nw”,
"
l
"
,
"l",
"l",“vc”,
"
h
w
"
,
"hw",
"hw",“ar”,
"
u
i
d
"
,
"uid",
"uid",“t”,
"
l
a
"
,
"la",
"la",“md”,
"
v
n
"
,
"vn",
"vn",“ba”,
"
s
r
"
,
"sr",
"sr",“event.ett”,
"
e
v
e
n
t
.
e
n
"
,
"event.en",
"event.en",“event.kv”)
val jsonDF7=
jsonDF6.select(
"
i
d
"
,
"id",
"id",“ap”,
"
l
n
"
,
"ln",
"ln",“sv”,
"
o
s
"
,
"os",
"os",“g”,
"
m
i
d
"
,
"mid",
"mid",“nw”,
"
l
"
,
"l",
"l",“vc”,
"
h
w
"
,
"hw",
"hw",“ar”,
"
u
i
d
"
,
"uid",
"uid",“t”,
"
l
a
"
,
"la",
"la",“md”,
"
v
n
"
,
"vn",
"vn",“ba”,
"
s
r
"
,
"sr",
"sr",“kv”)
val jsonDFloading =
jsonDF6.filter(x=>x.getAs(“en”)==“loading”).select(
"
e
n
"
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
"en",get_json_object(
"en",getjsonobject(“kv”,"
.
e
x
t
e
n
d
2
"
)
.
a
l
i
a
s
(
"
e
x
t
e
n
d
2
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.extend2").alias("extend2"),get_json_object(
.extend2").alias("extend2"),getjsonobject(“kv”,"
.
l
o
a
d
i
n
g
t
i
m
e
"
)
.
a
l
i
a
s
(
"
l
o
a
d
i
n
g
t
i
m
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.loading_time").alias("loading_time"),get_json_object(
.loadingtime").alias("loadingtime"),getjsonobject(“kv”,"
.
a
c
t
i
o
n
"
)
.
a
l
i
a
s
(
"
a
c
t
i
o
n
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.action").alias("action"),get_json_object(
.action").alias("action"),getjsonobject(“kv”,"
.
e
x
t
e
n
d
1
"
)
.
a
l
i
a
s
(
"
e
x
t
e
n
d
1
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.extend1").alias("extend1"),get_json_object(
.extend1").alias("extend1"),getjsonobject(“kv”,"
.
t
y
p
e
"
)
.
a
l
i
a
s
(
"
t
y
p
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.type").alias("type"),get_json_object(
.type").alias("type"),getjsonobject(“kv”,"
.
t
y
p
e
1
"
)
.
a
l
i
a
s
(
"
t
y
p
e
1
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.type1").alias("type1"),get_json_object(
.type1").alias("type1"),getjsonobject(“kv”,"$.loading_way").alias(“loading_way”))
val jsonDFad =
jsonDF6.filter(x=>x.getAs(“en”)==“ad”).select(
"
e
n
"
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
"en",get_json_object(
"en",getjsonobject(“kv”,"
.
a
c
t
i
v
i
t
y
I
d
"
)
.
a
l
i
a
s
(
"
a
c
t
i
v
i
t
y
I
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.activityId").alias("activityId"),get_json_object(
.activityId").alias("activityId"),getjsonobject(“kv”,"
.
d
i
s
p
l
a
y
M
i
l
l
s
"
)
.
a
l
i
a
s
(
"
d
i
s
p
l
a
y
M
i
l
l
s
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.displayMills").alias("displayMills"),get_json_object(
.displayMills").alias("displayMills"),getjsonobject(“kv”,"
.
e
n
t
r
y
"
)
.
a
l
i
a
s
(
"
e
n
t
r
y
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.entry").alias("entry"),get_json_object(
.entry").alias("entry"),getjsonobject(“kv”,"
.
a
c
t
i
o
n
"
)
.
a
l
i
a
s
(
"
a
c
t
i
o
n
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.action").alias("action"),get_json_object(
.action").alias("action"),getjsonobject(“kv”,"$.contentType").alias(“contentType”))
val jsonDFnotification =
jsonDF6.filter(x=>x.getAs(“en”)==“notification”).select(
"
e
n
"
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
"en",get_json_object(
"en",getjsonobject(“kv”,"
.
a
p
t
i
m
e
"
)
.
a
l
i
a
s
(
"
a
p
t
i
m
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.ap_time").alias("ap_time"),get_json_object(
.aptime").alias("aptime"),getjsonobject(“kv”,"
.
a
c
t
i
o
n
"
)
.
a
l
i
a
s
(
"
a
c
t
i
o
n
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.action").alias("action"),get_json_object(
.action").alias("action"),getjsonobject(“kv”,"
.
t
y
p
e
"
)
.
a
l
i
a
s
(
"
t
y
p
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.type").alias("type"),get_json_object(
.type").alias("type"),getjsonobject(“kv”,"$.content").alias(“content”))
val jsonDFcomment =
jsonDF6.filter(x=>x.getAs(“en”)==“comment”).select(
"
e
n
"
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
"en",get_json_object(
"en",getjsonobject(“kv”,"
.
p
c
o
m
m
e
n
t
i
d
"
)
.
a
l
i
a
s
(
"
p
c
o
m
m
e
n
t
i
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.p_comment_id").alias("p_comment_id"),get_json_object(
.pcommentid").alias("pcommentid"),getjsonobject(“kv”,"
.
a
d
d
t
i
m
e
"
)
.
a
l
i
a
s
(
"
a
d
d
t
i
m
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.addtime").alias("addtime"),get_json_object(
.addtime").alias("addtime"),getjsonobject(“kv”,"
.
p
r
a
i
s
e
c
o
u
n
t
"
)
.
a
l
i
a
s
(
"
p
r
a
i
s
e
c
o
u
n
t
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.praise_count").alias("praise_count"),get_json_object(
.praisecount").alias("praisecount"),getjsonobject(“kv”,"
.
o
t
h
e
r
i
d
"
)
.
a
l
i
a
s
(
"
o
t
h
e
r
i
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.other_id").alias("other_id"),get_json_object(
.otherid").alias("otherid"),getjsonobject(“kv”,"
.
c
o
m
m
e
n
t
i
d
"
)
.
a
l
i
a
s
(
"
c
o
m
m
e
n
t
i
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.comment_id").alias("comment_id"),get_json_object(
.commentid").alias("commentid"),getjsonobject(“kv”,"
.
r
e
p
l
y
c
o
u
n
t
"
)
.
a
l
i
a
s
(
"
r
e
p
l
y
c
o
u
n
t
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.reply_count").alias("reply_count"),get_json_object(
.replycount").alias("replycount"),getjsonobject(“kv”,"
.
u
s
e
r
i
d
"
)
.
a
l
i
a
s
(
"
u
s
e
r
i
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.userid").alias("userid"),get_json_object(
.userid").alias("userid"),getjsonobject(“kv”,"$.content").alias(“content”))
val jsonDFpraise =
jsonDF6.filter(x=>x.getAs(“en”)==“praise”).select(
"
e
n
"
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
"en",get_json_object(
"en",getjsonobject(“kv”,"
.
t
a
r
g
e
t
i
d
"
)
.
a
l
i
a
s
(
"
t
a
r
g
e
t
i
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.target_id").alias("target_id"),get_json_object(
.targetid").alias("targetid"),getjsonobject(“kv”,"
.
i
d
"
)
.
a
l
i
a
s
(
"
i
d
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.id").alias("id"),get_json_object(
.id").alias("id"),getjsonobject(“kv”,"
.
t
y
p
e
"
)
.
a
l
i
a
s
(
"
t
y
p
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.type").alias("type"),get_json_object(
.type").alias("type"),getjsonobject(“kv”,"
.
a
d
d
t
i
m
e
"
)
.
a
l
i
a
s
(
"
a
d
d
t
i
m
e
"
)
,
g
e
t
j
s
o
n
o
b
j
e
c
t
(
.add_time").alias("add_time"),get_json_object(
.addtime").alias("addtime"),getjsonobject(“kv”,"$.userid").alias(“userid”))
最后
以上就是明理小鸽子为你收集整理的日志文件完成数据清洗的全部内容,希望文章能够帮你解决日志文件完成数据清洗所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复