概述
ID-MAPPING
总目标: 将每一个用户映射到一个全局唯一的GUID上(global unique id)
1.确定唯一GUID(userid)的相关逻辑
-
1.日志有account
-
--> guid就是该账号在业务库中对应userid,userid在业务系统的业务库里
-
-
2.日志没有account,设备账号绑定关系权重表能找到
-
--> 根据空账号设备的权重来决定日志归属,得到权重表中的account,并找到业务库的userid
-
-
3.日志没有account,设备账号绑定关系权重表不能找到,即从来没有注册过
-
--> guid就是50亿往上不断递增的一个值,生成新的userid存入空设备account和guid映射表中
-
2.总体的维护和查询逻辑
3.设备账号绑定关系权重表
3.1设备账号绑定关系权重表的模型设计
-
设备号 deviceid
-
账号 account
-
权重 score
-
最后登录日期 last_login
3.2表数据来源哪里?
ods层的日志数据信息和前一天的账号绑定关系权重表
3.3是一次性的计算还是滚动更新的计算?
是更新滚动的计算。
根据 今天经去重处理的行为日志数据信息(ods表中) + 前一天的账号绑定关系权重表 --> 今天的账号绑定关系权重表 --> 删掉前一天的账号绑定关系权重表
权重score计算 包括 权重随时间衰减等机制和加分正比会话数
3.4用SQL怎么实现?
今天经去重处理的行为日志数据信息(ods表中) + 前一天的账号绑定关系权重表,这两张表来一个满外连接,
左表有,右表没有的说明是新用户,更新右表的设备id、账户(大值自增,新的userid)、权重和最后登录时间;
左表没有,右表有的说明用户单日无操作,降低右表的权重;
左表有,右表也有说明用户进行了操作,增加右表的权重、更改最后登陆时间
3.5设备账号绑定关系权重表过程示意图
3.6行为日志数据信息(ods表中)作为连接表存在的问题
由于是行为日志,相同deviceid和account的组合的行为会有很多种,外连接后数据会急剧膨胀。我们只需要确定行为日志中deviceid和account的组合确实出现过,而不关心出现了几次,对此二者的组合可以做一个groupby去重处理,并统计deviceid和account的组合中sessionID的个数来确定权重加分的具体值
3.7应该属于数仓的哪一层
因为参与形成该表的行为日志表经过了分组聚合,所以将该表放在dws层。
3.8维护SQL
INSERT INTO TABLE dws.mall_app_device_account_bind PARTITION(dt = '${dt}')
-- 满连接两张表并写逻辑实现关系绑定表的更新
SELECT
NVL(o1.device_id, o2.device_id) AS device_id, -- 哪边不为空取哪边
NVL(o1.account, o2.account) AS account,
CASE
WHEN o1.device_id IS NOT NULL AND o2.device_id IS NOT NULL THEN o1.weight + o2.weight
WHEN o1.device_id IS NOT NULL AND o2.device_id IS NULL THEN o1.weight
ELSE o2.weight * 0.5
END AS weight,
IF(o1.device_id IS NOT NULL, '${dt}', o2.last_login) AS last_login
FROM(
-- 从日志文件提取出来的当天聚合信息(包括同一设备,账号组合出现的会话次数)
-- 用来更新前一天的关系绑定表
SELECT
deviceid AS device_id,
account,
COUNT(DISTINCT sessionid) * 10 AS weight
FROM ods.mall_app_log
WHERE dt = '${dt}' AND TRIM(account) != '' AND account IS NOT NULL
GROUP BY deviceid, account
) o1
FULL JOIN(
-- 前一天的关系绑定表
SELECT
device_id,
account,
weight,
last_login
FROM dws.mall_app_device_account_bind
WHERE dt = '${dt_pre}'
) o2
ON CONCAT(o1.device_id, o1.account) = CONCAT(o2.device_id, o2.account);
4.空设备account和guid映射表建表与维护SQL
4.1空设备account和guid映射表的模型设计
-
device_id
-
user_id
4.2表数据来源哪里?
对于某条行为日志,如果经过3次过滤都没有得到guid,其device_id被记录并且算法赋予其一个全局唯一的user_id作为guid。
4.3是一次性的计算还是滚动更新的计算?
是每天进行的一次性计算,计算完毕后不再被修改,每天的数据都进行追加。查询时需要关联所有历史日期的所有表,这些特点区别于设备账号关系绑定表。
4.4应该属于数仓的哪一层
放在dws层
4.5瘦身计划
该表需要每隔若干天进行一次瘦身,去关联设备账户关系绑定表,因为若干天后有些空帐户设备已经登录了账号,应该从表中去除
4.6维护SQL
SET hive.cli.print.header=flase;
SELECT
NVL(MAX(user_id),1000000000)
FROM dws.mall_app_device_tmpid
WHERE dt = '${dt_pre}'
INSERT INTO TABLE dws.mall_app_device_tmpid PARTITION(dt = '${dt}')
SELECT
o1.device_id,
-- 5.为新的空账号的设备id,在前日最大user_id基础上,加上递增user_id
ROW_NUMBER() OVER() + ${max_id} AS user_id
FROM
-- 1.从昨日的行为日志中,得到所有不带account的设备id
(
SELECT
deviceid AS device_id
FROM ods.mall_app_log
WHERE dt = '${dt}'
GROUP BY deviceid
HAVING MAX(IF(TRIM(account) = '', NULL, account)) IS NULL
) o1
-- 2.将上面结果与昨日设备账号关系绑定表关联,得到还是找不到account的设备id
LEFT JOIN
(
SELECT
device_id
FROM dws.mall_app_device_account_bind
WHERE dt = '${dt}'
) o2
ON
o1.device_id = o2.device_id
-- 3.将上面结果与前日空账号的设备id和user_id的映射表关联,得到还是找不到account的设备id
LEFT JOIN
(
SELECT
device_id
FROM dws.mall_app_device_tmpid
-- WHERE dt = '${dt_pre}' -- 并不像设备账号关系绑定表一样是一天一个的版本迭代,该表所有数据都作为匹配条件
) o3
ON
o1.device_id = o3.device_id
WHERE
o2.device_id IS NULL AND o3.device_id IS NULL
注意:
-
并不像设备账号关系绑定表一样是一天一个的版本迭代,该表所有数据都作为匹配条件
-
该表需要每隔若干天进行一次瘦身,去关联设备账户关系绑定表,因为若干天后有些空帐户设备已经登录了账号,应该从表中去除
关于空账户设备和guid映射表脚本(算是一个优化点了,可以当素材准备 )
需要注意的是,原本的sql里边为了取到当前最大的guid,在最后用了一个外连接关联,这样做效率很低,当我们使用shell脚本时可以将当前最大的guid取出赋值给变量,上边的窗口函数可以直接加上该变量值。同时少了最底下的JOIN,还可以减少一层select
但是,如下代码块直接执行想取到sql1的值,会打印出一大堆的信息,而我们只关心查询的那一个结果,好赋值给变量接收,于是用-S静默运行,-v反选以精简打印。但是我的hive版本没有这个问题
hive -S -e "${sql1}" | grep -v "INFO" | grep -v "WARN"
我的问题是输出将表头一起打印出来,导致分隔符无法确定。于是用SET hive.cli.print.header=flase;指令将表头去除
sql1="
SET hive.cli.print.header=flase;
SELECT
NVL(MAX(user_id),1000000000) AS max_userid
FROM dws.mall_app_device_tempid
WHERE dt = '${dt_pre}';
"
${HIVE_HOME}/bin/hive -e "${sql1}"
实测不添加-S,-v,输出也是1000000000
5.生成全局用户唯一标识所用表的维护流程图
由于维护表一定在更新guid之前,所以事实上维护和查值走的是完全独立的两个路线,换句话说,查值的时候只会走Y的情况,维护的时候由于IS NULL的限制只会走N的情况。
6.模拟MySQL业务库中的表
生产环境中直接使用业务库的表即可,这里做一个模拟
对于第一天的日志数据,用户都是新用户,distinct提取出account生成模拟业务表,用rownum给account打上模拟userid
对于之后的日志需要和当日前一日的模拟业务表关联,进行模拟更新
#!/bin/bash
#模拟每天滚动新增的用户信息表
#注意生产条件下直接导入业务库中相关的表即可
export HIVE_HOME=/opt/tools/hive-3.1.2/
#建表
#sql0="
#CREATE TABLE IF NOT EXISTS dwd.user_reg_info(
# account STRING,
# user_id BIGINT
#)
#PARTITIONED BY (dt STRING)
#STORED AS orc
#TBLPROPERTIES('orc.compress'='snappy')
#;
#"
#
#${HIVE_HOME}/bin/hive -e "${sql0}"
dt=$(date -d'-1 day' +%Y-%m-%d)
dt_pre=$(date -d"${dt} -1 day" +%Y-%m-%d)
# 脚本调用者传入指定日期,则dt为指定日期
if [ $1 ];then
dt=$1
fi
#查询前日最大user_id
sql1="
SET hive.cli.print.header=flase;
SELECT
NVL(MAX(user_id),0)
FROM dwd.user_reg_info
WHERE dt = '${dt_pre}'
"
max_userid=$(${HIVE_HOME}/bin/hive -e "${sql1}")
#每日更新
sql2="
INSERT INTO TABLE dwd.user_reg_info PARTITION(dt = '${dt}')
SELECT
account,
row_number() over() + ${max_userid} AS user_id
FROM ods.mall_app_log
WHERE dt = '${dt}'
GROUP BY account
HAVING TRIM(account) != '' AND account IS NOT NULL
"
${HIVE_HOME}/bin/hive -e "${sql2}"
#${HIVE_HOME}/bin/hive --auxpath /opt/tools/hive-3.1.2/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.2.jar -S -e "$sql2"
if [ $? -eq 0 ];then
echo "数仓任务执行报告,日期${dt},模拟滚动用户信息表 成功"
echo "数仓任务执行报告,日期${dt},模拟滚动用户信息表 成功" | mail -s '数仓平台,任务成功通知' hjyuan925@foxmail.com
exit 0
else
echo "数仓任务执行报告,日期${dt},模拟滚动用户信息表 失败"
echo "数仓任务执行报告,日期${dt},模拟滚动用户信息表 失败" | mail -s '数仓平台,任务失败通知' hjyuan925@foxmail.com
exit 1
fi
7.基于以上3张表标注guid的具体逻辑
7.1标注guid实现逻辑之HQL
示意图恰好可以用上边的维护流程图来描述
由上图可以写出用hql实现的查询逻辑,hql可以实现多层子查询的复杂逻辑
-- dwd目标表建表:
create table dwd.mall_applog_detail(
guid bigint
, account string
, app_id string
, app_version string
, carrier string
, device_id string
, device_type string
, event_id string
, ip string
, latitude double
, longitude double
, net_type string
, os_name string
, os_version string
, properties map<string,string>
, release_channel string
, resolution string
, session_id string
, ts bigint
, new_session_id string
, province string
, city string
, region string
)
partitioned by (dt string)
stored as orc
tblproperties('orc.compress'='snappy')
;
-- guid标注
-- 先将日志数据分为两部分
-- 1. 有账号信息的数据 ==> part1
-- 2. 没有账号信息的数据 ==> part2
-- part1 对有账号信息的数据,关联 "用户注册信息表" ,得到 user_id 作为 guid
-- part2 对无账号信息的数据
-- 先从 "设备账号绑定表" 中,取出每个 设备对应的权重最大的账号 ==> tmp1
-- 用tmp1 关联 "用户注册信息表" ,得到 user_id ==> tmp2
-- 拿着 part2 关联 tmp2 关联 "空设备id映射表"
-- 取数:优先用 tmp2的user_id,次之用 "空设备id映射表"的user_id ,作为 guid
-- 最后,把part1 UNION ALL part2
-- 建议: 此处一定要自己弄的很熟
-- 面试的时候,可以找机会重点描述 -- 涉及到逻辑的优化,也能体现你平常开发中的任务的逻辑复杂度
-- 如果用spark sql 来写,更灵活,对于没账号的数据,可以现在设备账号绑定表找到帐号,和有账号的数据一同关联用户注册信息表
-- 而且因为逻辑中涉及到要反复对一份数据进行运算,可充分利用spark的cache机制来提高效率
-- 当天的日志有账号的数据
INSERT INTO TABLE dwd.mall_app_detail PARTITION(dt='2022-03-16')
SELECT
o2.user_id
,o1.account
,o1.app_id
,o1.app_version
,o1.carrier
,o1.device_id
,o1.device_type
,o1.event_id
,o1.ip
,o1.latitude
,o1.longitude
,o1.net_type
,o1.os_name
,o1.os_version
,o1.properties
,o1.release_channel
,o1.resolution
,o1.session_id
,o1.ts
,o1.new_session_id
,o1.province
,o1.city
,o1.region
FROM tmp.mall_applog_washed_split_located
WHERE dt='2022-03-16' AND (TRIM(account)!='' OR account IS NOT NULL) o1
LEFT JOIN
dwd.user_reg_info o2
ON o1.account=o2.account
UNION ALL
SELECT
NVL(o2.user_id,o3.user_id) AS user_id --o2的account也可能是空
,NVL(o2.account,NULL) AS account -- 注意,不仅要补user_id,还要补account
,o1.app_id
,o1.app_version
,o1.carrier
,o1.device_id
,o1.device_type
,o1.event_id
,o1.ip
,o1.latitude
,o1.longitude
,o1.net_type
,o1.os_name
,o1.os_version
,o1.properties
,o1.release_channel
,o1.resolution
,o1.session_id
,o1.ts
,o1.new_session_id
,o1.province
,o1.city
,o1.region
,o1.dt
FROM
(
-- 当天的日志没有账号的数据
SELECT
*
FROM tmp.mall_applog_washed_split_located
WHERE dt='2022-03-16' AND (TRIM(account)='' OR account IS NULL) -- 优化!o1表的account不为空的数据不参加join!
)o1
LEFT JOIN
(
SELECT
t1.account,
t1.device_id,
t2.user_id
FROM
(
-- 表中设备上绑定权重最大的账号
SELECT
device_id,
account
FROM(
SELECT
-- 精彩句 分区TopN
device_id,
account,
ROW_NUMBER() OVER(PARTITION BY device_id ORDER BY weight DESC, last_login DESC) AS rn
FROM dws.mall_app_device_account_bind
WHERE dt='2022-03-16'
) o
WHERE rn = 1
)t1
JOIN -- 关联业务库用户信息表,用t1找到的account直接拿到模拟业务表的user_id!
-- 有account一定有user_id
-- 业务库用户信息表
dwd.user_reg_info t2
ON t1.account=t2.account
)o2
ON
o1.device_id = o2.device_id;
LEFT JOIN
dws.mall_app_device_tempid o3-- 3个字段 device_id, user_id, dt
ON
o1.device_id = o3.device_id;
-- 注意不是关联某个日期分期,而是整张表
7.2标注guid实现逻辑之Spark SQL
从上边的逻辑图可以清楚地看到有时候需要join两次模拟的业务库表,效率较低。
改进办法是先执行中路再执行上路,即先找到没有account的设备id,去空账号设备id和uid映射表里找到权重最高的account,获得相对完整的account集合,再回到上游查业务表,这样整个过程只用查一次业务库。示意图如下:
选择使用Spark SQL来完成逻辑,可以优化逻辑,对部分中间集还可以cache缓存来提高效率
package cn.yhjnewbie.dolphin.etl
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object ApplogGuid {
def main(args: Array[String]): Unit = {
if (args.length != 1) {
println(
"""
|usage:必须传入1个参数
| 参数1:待处理的日期,如 2022-03-17
|""".stripMargin)
sys.exit(1)
}
val dt: String = args(0)
val spark: SparkSession = SparkSession.builder()
//.config("spark.sql.shuffle.partitions",2)
.appName("app端日志数据清洗过滤计算")
// .master("local")
.enableHiveSupport()
.getOrCreate()
// 读取hive中经过清洗、过滤、规范化、session分割、地理位置集成之后的临时表
//为避免后边select * 将dt字段带上导致union all的字段数匹配不上,将dt字段从源头上drop掉
val tInterim: Dataset[Row] = spark.read.table("tmp.mall_applog_washed_split_located").drop("dt").where(s"dt='${dt}'")
// 相比hql的优化:能将总需要关联的表缓存
tInterim.cache()
//将账号信息分为两部分,带账号信息 和 不带账号信息的
val tWithAcc: Dataset[Row] = tInterim.where("trim(account)!='' and account is not null")
val tWithoutAcc: Dataset[Row] = tInterim.where("trim(account)='' or account is null")
//不带账号的数据 => 从“设备账号绑定关系表”找账号
// TODO: “设备账号绑定关系表”找到绑定权重最大的账号
//使用sparkSQL就不需要如hql一样写双层select嵌套了,当然也可以写双层
val tBindWithWeight: DataFrame = spark.sql(
s"""
|select
| device_id,
| account,
| row_number() over(partition by device_id order by weight desc,last_login desc) as rn
|from dws.mall_app_device_account_bind
|where dt='${dt}'
|""".stripMargin
).where("rn=1")
// TODO: tWithoutAcc关联tBindWithWeight,找到部分缺失的account
// 由于sparkSQL难以给表取别名,对于有相同字段的表关联使用join的table api会很生疏
// 将要关联的两张表注册成视图,又可以继续写sparkSQL了
tWithoutAcc.createTempView("tWithoutAcc")
tBindWithWeight.createTempView("tBindWithWeight")
val tWithoutAccTryMatch: DataFrame = spark.sql(
"""
|-- 取到绑定关系中的账号
|select
| nvl(t2.account,null) as account
| ,t1.app_id
| ,t1.app_version
| ,t1.carrier
| ,t1.device_id
| ,t1.device_type
| ,t1.event_id
| ,t1.ip
| ,t1.latitude
| ,t1.longitude
| ,t1.net_type
| ,t1.os_name
| ,t1.os_version
| ,t1.properties
| ,t1.release_channel
| ,t1.resolution
| ,t1.session_id
| ,t1.ts
| ,t1.new_session_id
| ,t1.province
| ,t1.city
| ,t1.region
|
|from tWithoutAcc t1 left join tBindWithWeight t2 on t1.device_id = t2.device_id
|
|""".stripMargin)
tWithoutAccTryMatch.cache() //对于这种反复用(where条件分裂为两个以上的表)可以缓存以提高新能
// TODO 将上边的tWithoutAccTryMatch数据分为有无找到账号的两部分
val tWithoutAccMatchAcc: Dataset[Row] = tWithoutAccTryMatch.where("account is not null")
val tWithoutAccUnMatchAcc: Dataset[Row] = tWithoutAccTryMatch.where("account is null")
// TODO 本来就有账号的 union all 找到帐号的数据,再去关联用户注册信息表,得到user_id。这一步是spark SQL和HQL操作的不同,spark SQL只需要关联一次用户注册信息表
tWithoutAccMatchAcc.unionAll(tWithAcc).createTempView("tWithAccUnion") //tWithAccFinally所有的数据都有account,必定能在注册信息表中找到user_id
// 记住一点:有account必定有对应的user_id
val tWithAccFinally: DataFrame = spark.sql(
"""
|select
| t2.user_id,
| t1.*
|from tWithAccUnion t1
|join
|dwd.user_reg_info t2
|on t1.account = t2.account
|
|""".stripMargin
)
tWithoutAccUnMatchAcc.createTempView("tWithoutAccUnMatchAcc")
val tWithoutAccFinally: DataFrame = spark.sql(
"""
|select
| t2.user_id,
| t1.*
|from tWithoutAccUnMatchAcc t1
|join
|dws.mall_app_device_tmpid t2
|on t1.device_id = t2.device_id
|""".stripMargin
)
// TODO union两部分拥有了user_id的数据,得到完整结果
tWithAccFinally.createTempView("tWithAccFinally")
tWithoutAccFinally.createTempView("tWithoutAccFinally")
spark.sql(
s"""
|insert into table dwd.mall_applog_detail partition(dt='${dt}')
|select *
|from
|tWithAccFinally
|union all
|select *
|from
|tWithoutAccFinally
|
|""".stripMargin
)
spark.close()
}
}
7.启动脚本
#!/bin/bash
#
#
# @desc:app端日志ods层数据guid标注
#
#
#
export SPARK_HOME=/opt/tools/spark-3.0.0/
export HADOOP_HOME=/opt/tools/hadoop-3.1.1/
export HIVE_HOME=/opt/tools/hive-3.1.2/
# 获取脚本运行时的前一日日期
dt=$(date -d'-1 day' +%Y-%m-%d)
# 如果脚本调用者传入了指定日期,则执行指定日期数据的导入
if [ $1 ];then
dt=$1
fi
${SPARK_HOME}/bin/spark-submit
--master yarn
--deploy-mode cluster
--class cn.yhjnewbie.dolphin.etl.ApplogGuid
--conf spark.sql.shuffle.partitions=2
--driver-memory 2g
--driver-cores 1
--executor-memory 2g
--executor-cores 1
--num-executors 2
--queue default
--driver-class-path /driver/mysql-connector-java-5.1.49.jar
/root/dw_etl-1.0.jar ${dt}
if [ $? -eq 0 ];then
echo "app日志数据guid标注任务,执行成功!"
echo "app日志数据guid标注任务,执行成功!" | mail -s '数仓平台-任务通知' hjyuan925@foxmail.com
exit 0
else
echo "app日志数据guid标注任务,执行失败!"
echo "app日志数据guid标注任务,执行失败!" | mail -s '数仓平台-任务通知' hjyuan925@foxmail.com
exit 1
fi
最后
以上就是能干彩虹为你收集整理的生成全局用户标识:ods2dwdID-MAPPING1.确定唯一GUID(userid)的相关逻辑2.总体的维护和查询逻辑3.设备账号绑定关系权重表4.空设备account和guid映射表建表与维护SQL5.生成全局用户唯一标识所用表的维护流程图6.模拟MySQL业务库中的表7.基于以上3张表标注guid的具体逻辑 7.启动脚本的全部内容,希望文章能够帮你解决生成全局用户标识:ods2dwdID-MAPPING1.确定唯一GUID(userid)的相关逻辑2.总体的维护和查询逻辑3.设备账号绑定关系权重表4.空设备account和guid映射表建表与维护SQL5.生成全局用户唯一标识所用表的维护流程图6.模拟MySQL业务库中的表7.基于以上3张表标注guid的具体逻辑 7.启动脚本所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复