我是靠谱客的博主 英俊镜子,最近开发中收集的这篇文章主要介绍广告统计数据恢复改进及实现,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

最近在做广告效果数据统计优化相关的工作,在优化中发先现有的实时统计方法存在许多的缺点,下面做一下简单的分析:

需求描述:

按天统计广告效果数据:

1).按天统计应用、位置、内容源上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等
2).按天统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等

按小时统计广告效果数据:

1).按小时统计应用、位置、内容源上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等
2).按小时统计应用、位置上的广告请求数、请求人数、曝光人数、曝光次数、点击人数、点击次数等

现有解决方案

目前广告效果统计采用Spark Streaming整合MQ做实时在线计算,实时消费MQ并按业务需求对数据进行清洗并存储到Hive库中,Spark Streaming 根据任务时间间隔,计算出所消费的MQ在这段时间范围内所包含的业务统计时间(oper_time),并根据统计时间从Hive库中读取出这段时间内的数据,按业务需求统计广告位上的请求数、请求人数、曝光、点击、下载等效果数据数据。

1.现方案存在的问题

现有方案存在两个问题:
1).数据出错回复代价较大
该实现方案目前有一些不足之处,当统计业务出现异常的时候,这个时候如果消费MQ的线程是正常运行的,并且对数据进行了清洗写入了Hive库中,但在进计算的时候出现问题,此时虽然MQ数据已经入到Hive库中,但是任务异常并尝试多次后任务自动结束,当任务再次启动并消费MQ,此时消费的MQ中已经不包含上次的业务统计时间了 ,因此这段时间内的数据就丢失了计算,如果需要恢复这段时间的数据,就需要重新注册一个GROUP去消费MQ,但是重新注册GROUP消费MQ的时候,该GROUP会消费对应TOPIC所包含的历史数据,所以不管是恢复一分钟的数据、还是一天的数据,相对来说代价都是一样的。
2).数据量大的时候任务延时较为严重
可以从需求看出,按天统计广告效果数据的时候,因为要对广告位上的这一天的数据都要进行一次计算,也就是说落地的数据都要在Spark Streaming任务时间间隔每次一任务都要重新读取计算,虽然Spark擅长处理大批量的数据,但是在现有集群机器数量有限,任务较多,资源紧张的时候,处理大批量的数据就不可避免的出现任务延时等情况。

2、基于现方案的改进及实现

问题1解决方案如下:
鉴于现方案在恢复数据的时候需要重新消费历史数据,这里采用离线的方式,不消费MQ直接对Hive库中的数据进行读取计算广告统计需求,按天和按小时统计计算逻辑是一致的,只是想要的数据粒度不一样,按天统计只需要对所有数据去重(去除重复上报的数据)根据应用和位置、内容源等进分组计算,人数根据imei、次数根据对应的事件进行区分计算即可,按小时计算的难点就是数据去重,然后把按把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。
难点分析:
HQL中如果有group by语句那么select出的字段必须在group by 中出现、或者是聚合函数(sum/avg等),如果某些字段不想进行group,但又想在select中出现,Hive提供了collect_set(column)函数,该函数返回的是根据column去重的列表。因为去除重复上报的数据,现在采取的机制是:除了时间(oper_time)字段不一样以外,其他字段一样则去除重复的数据,所以oper_time不能出现在group字段中,但是计算的时候需要oper_time,这时候collect_set(column)函数,就可以使用上了,解决了去重的问题,剩下的一个难点就是把oper_time(毫秒级精度)转换成需要计算的粒度(这里统计粒度是十分钟)。这里使用floor()这个向下取整函数,毫秒级转换为精度为10分钟级别数据算法如下:
oper_time = floor(oper_time/1000/60/10)*60*10
公式解释:除以1000是转换成秒级别精度除以60是转换为分钟级别的粒度,除以10是转换为10分钟界别的,此时取整后就是10分钟级别的,但是相对于妙级别的数据长度,向下取整后数据缩小了600倍所以*600 ,这时候计算出来的数据就是我们需要的粒度为10分钟的时间数据。解决了上面两个难点后,就可以像我们平时写SQL一样根据条件去计算了。

问题2解决方案:
可以看出,是资源与数据量的冲突,如果集群资源充足,则多分配点内存、CPU则可以实时的计算统计,但如果资源不够则可以采用另一种方案,把实时的变为准实时,用离线的方式,半个小时或者一个小时运行一次,这样对机器的压力就小很多了。

附件

完整脚本:

!/usr/bin/env python
 coding=UTF-8

import os
import sys
import getopt

import datetime
import time

##执行命令
def execute_shell(db,hql):
    #将hql语句进行字符转义
    hql = hql.replace(""","'")
    #执行查询,并取得执行的状态和输出
    cmd = HIVE_PATH+"hive -S -e" use "+database";"+hql+"""
    status,output = commands.getstatusoutput(cmd)
    if status !=0:
        return None
    else:
        print "success"
    output = str(output).split("n")
    return output

def execute(db,yyyymmdd):

    sql_comment="from bdl_fdt_ad_ssp_log to adl_fdt_ad_ssp_log_hour_result"
    sql="""
    insert overwrite table adl_fdt_ad_ssp_log_hour_result partition(stat_date=%(yyyymmdd)s)
    select
    api_type,mz_appid,s_mz_id,
    sum(case when oper_type = 'EXPOSURE'  then 1 else 0 end) as expo_times,
    count(distinct case when oper_type = 'EXPOSURE' then s_imei else null end) as expo_peoples,
    sum(case when oper_type = 'DCLICK'  then 1 else 0 end) as click_times,
    count(distinct case when oper_type = 'DCLICK' then s_imei else null end) as click_peoples,
    sum(case when oper_type = 'QUERY'  then 1 else 0 end) as query_times,
    count(distinct case when oper_type = 'QUERY' then s_imei else null end) as query_peoples,
    sum(case when i_resp_adcount>0  then 1 else 0 end) as i_resp_adcount,
    sum(case when oper_type = 'DOWNLOAD'  then 1 else 0 end) as download_times,
    count(distinct case when oper_type = 'DOWNLOAD' then s_imei else null end) as download_peoples,
    sum(case when oper_type = 'DOWNLOAD_COMPLETED'  then 1 else 0 end) as download_completed_times,
    count(distinct case when oper_type = 'DOWNLOAD_COMPLETED' then s_imei else null end) as download_completed_peoples,
    sum(case when oper_type = 'INSTALL_COMPLETED'  then 1 else 0 end) as install_completed_times,
    count(distinct case when oper_type = 'INSTALL_COMPLETED' then s_imei else null end) as install_completed_peoples,
    sum(case when oper_type = 'CLOSE'  then 1 else 0 end) as close_times,
    count(distinct case when oper_type = 'CLOSE' then s_imei else null end) as close_peoples,
    sum(case when oper_type = 'QUERY' and addition = 'true' then 1 else 0 end) as query_addition_times,
    sum(case when oper_type = 'EXPOSURE' and addition = 'true' then 1 else 0 end) as expo_addition_times,
 oper_time as stat_time
   from (select s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date,floor(collect_set(oper_time)[0]/600000)*600 oper_time  from bdl_fdt_ad_ssp_log  where stat_date = %(yyyymmdd)s group by s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date ) bdl where bdl.stat_date = %(yyyymmdd)s group by bdl.api_type,bdl.mz_appid,bdl.s_mz_id,oper_time
   union all
   select
   0,mz_appid,s_mz_id,
   sum(case when oper_type = 'EXPOSURE'  then 1 else 0 end) as expo_times,
   count(distinct case when oper_type = 'EXPOSURE' then s_imei else null end) as expo_peoples,
    sum(case when oper_type = 'DCLICK'  then 1 else 0 end) as click_times,
    count(distinct case when oper_type = 'DCLICK' then s_imei else null end) as click_peoples,
    count(distinct case when oper_type = 'QUERY'  then s_request_id else null end) as query_times,
    count(distinct case when oper_type = 'QUERY' then s_imei else null end) as query_peoples,
    if(count(distinct case when oper_type = 'QUERY'  then s_request_id else null end)<sum(case when i_resp_adcount>0  then 1 else 0 end),count(distinct case when oper_type = 'QUERY'  then s_request_id else null end),sum(case when i_resp_adcount>0  then 1 else 0 end)) as i_resp_adcount,
    sum(case when oper_type = 'DOWNLOAD'  then 1 else 0 end) as download_times,
    count(distinct case when oper_type = 'DOWNLOAD' then s_imei else null end) as download_peoples,
    sum(case when oper_type = 'DOWNLOAD_COMPLETED'  then 1 else 0 end) as download_completed_times,
    count(distinct case when oper_type = 'DOWNLOAD_COMPLETED' then s_imei else null end) as download_completed_peoples,
    sum(case when oper_type = 'INSTALL_COMPLETED'  then 1 else 0 end) as install_completed_times,
    count(distinct case when oper_type = 'INSTALL_COMPLETED' then s_imei else null end) as install_completed_peoples,
    sum(case when oper_type = 'CLOSE'  then 1 else 0 end) as close_times,
    count(distinct case when oper_type = 'CLOSE' then s_imei else null end) as close_peoples,
    sum(case when oper_type = 'QUERY' and addition = 'true' then 1 else 0 end) as query_addition_times,
    sum(case when oper_type = 'EXPOSURE' and addition = 'true' then 1 else 0 end) as expo_addition_times,
    oper_time as stat_time
    from (select s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date,floor(collect_set(oper_time)[0]/600000)*600 oper_time
    from bdl_fdt_ad_ssp_log  where stat_date = %(yyyymmdd)s group by s_mz_id,s_imei,oper_type,api_type,mz_appid,s_request_id,i_resp_adcount,addition,stat_date  ) bdl 
    where bdl.stat_date = %(yyyymmdd)s group by bdl.mz_appid,bdl.s_mz_id,oper_time;
    """%{'yyyymmdd':yyyymmdd}
   execute_shell(db,hql)

if __name__ == "__main__":

    #运行计算sql脚本
    execute(db,20161203)

###数据原始表
CREATE TABLE `bdl_fdt_ad_ssp_log`(
  `s_mz_id` string, 
  `s_imei` string, 
  `oper_type` string, 
  `api_type` string, 
  `mz_appid` string, 
  `oper_time` bigint, 
  `s_request_id` string, 
  `i_resp_adcount` bigint, 
  `i_resp_code` bigint, 
  `addition` string)
PARTITIONED BY ( 
  `stat_date` bigint)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION

###统计结果表
CREATE TABLE `adl_fdt_ad_ssp_log_hour_result`(
  `api_type` string, 
  `mz_appid` string, 
  `s_mz_id` string, 
  `expo_times` bigint, 
  `expo_peoples` bigint, 
  `click_times` bigint, 
  `click_peoples` bigint, 
  `query_times` bigint, 
  `query_peoples` bigint, 
  `i_resp_adcount` bigint, 
  `download_times` bigint, 
  `download_peoples` bigint, 
  `download_completed_times` bigint, 
  `download_completed_peoples` bigint, 
  `install_completed_times` bigint, 
  `install_completed_peoples` bigint, 
  `close_times` bigint, 
  `close_peoples` bigint, 
  `query_addition_times` bigint, 
  `expo_addition_times` bigint, 
  `stat_time` bigint)
PARTITIONED BY ( 
  `stat_date` bigint)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'

最后

以上就是英俊镜子为你收集整理的广告统计数据恢复改进及实现的全部内容,希望文章能够帮你解决广告统计数据恢复改进及实现所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部