概述
目录
代码
PageOneStepConvertRateSpark.java
本篇文章记录页面单跳转化率-页面切片生成以及页面流匹配算法实现。
代码
PageOneStepConvertRateSpark.java
package graduation.java.spark.page; import com.alibaba.fastjson.JSONObject; import graduation.java.constant.Constants; import graduation.java.dao.ITaskDAO; import graduation.java.domain.Task; import graduation.java.factory.DAOFactory; import graduation.java.util.DateUtils; import graduation.java.util.ParamUtils; import graduation.java.util.SparkUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.bouncycastle.asn1.crmf.ProofOfPossession; import scala.Int; import scala.Tuple2; import java.util.*; /** * FileName: PageOneStepConvertRateSpark * Author: hadoop * Email: 3165845957@qq.com * Date: 19-3-30 下午8:00 * Description: * 页面单跳转化率模块spark作业 */ public class PageOneStepConvertRateSpark { public static void main(String[] args) { //1.构建SparkConf上下文 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME_PAGE); SparkUtils.setMaster(conf); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = SparkUtils.getSQLContext(sc.sc()); //2.生成模拟数据 SparkUtils.mockData(sc,sqlContext); //3.查询任务、获取任务参数 long taskid = ParamUtils.getTaskIdFromArgs(args,Constants.SPARK_LOCAL_TASKID_PAGE); ITaskDAO taskDAO = DAOFactory.getTaskDAO(); Task task = taskDAO.findById(taskid); if (task == null){ System.out.println(new Date() + ": cannot find this task with id ["+ taskid+ "]"); return; } JSONObject taskParam = JSONObject.parseObject(task.getTaskParam()); //4.查询指定日期内的用户访问数据 JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDateRange(sqlContext,taskParam); // 对用户访问行为数据做一个映射,将其映射为<sessionid,访问行为>的格式 // 咱们的用户访问页面切片的生成,是要基于每个session的访问数据,来进行生成的 // 脱离了session,生成的页面访问切片,是么有意义的 // 举例,比如用户A,访问了页面3和页面5 // 用于B,访问了页面4和页面6 // 漏了一个前提,使用者指定的页面流筛选条件,比如页面3->页面4->页面7 // 你能不能说,是将页面3->页面4,串起来,作为一个页面切片,来进行统计呢 // 当然不行 // 所以说呢,页面切片的生成,肯定是要基于用户session粒度的 JavaPairRDD<String,Row> sessionid2actionRDD = getSessionid2actionRDD(actionRDD); // 对<sessionid,访问行为> RDD,做一次groupByKey操作 // 因为我们要拿到每个session对应的访问行为数据,才能够去生成切片 JavaPairRDD<String,Iterable<Row>> sessionid2actionsRDD = sessionid2actionRDD.groupByKey(); // 最核心的一步,每个session的单跳页面切片的生成,以及页面流的匹配,算法 JavaPairRDD<String,Integer> pageSplitRDD = generateAndMatchPageSplit(sc,sessionid2actionsRDD,taskParam); Map<String, Long> pageSplitPVMap = pageSplitRDD.countByKey(); } /** * 获取<sessionid,用户访问行为>格式的数据 * @param actionRDD 用户访问行为RDD * @return <sessionid,用户访问行为>格式的数据 */ private static JavaPairRDD<String,Row> getSessionid2actionRDD(JavaRDD<Row> actionRDD) { return actionRDD.mapToPair(new PairFunction<Row, String, Row>() { private static final long serailVersionUID = 1L; @Override public Tuple2<String, Row> call(Row row) throws Exception { String sessionid = row.getString(2); return new Tuple2<String,Row>(sessionid,row); } }); } /** * 页面切片生成与匹配算法 * @param sc * @param sessionid2actionsRDD * @param taskParam * @return */ private static JavaPairRDD<String,Integer> generateAndMatchPageSplit(JavaSparkContext sc, JavaPairRDD<String, Iterable<Row>> sessionid2actionsRDD, JSONObject taskParam) { String targetPageFlow = ParamUtils.getParam(taskParam,Constants.PARAM_TARGET_PAGE_FLOW); final Broadcast<String> targetPageFlowBroadcast = sc.broadcast(targetPageFlow); return sessionid2actionsRDD.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Iterable<Row>>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<String, Integer>> call(Tuple2<String, Iterable<Row>> tuple) throws Exception { //定义返回的list List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); //获取到当前session的访问行为的迭代器 Iterator<Row> iterator = tuple._2.iterator(); // 获取使用者指定的页面流 // 使用者指定的页面流,1,2,3,4,5,6,7 // 1->2的转化率是多少?2->3的转化率是多少? String[] targetPages = targetPageFlowBroadcast.value().split(","); // 这里,我们拿到的session的访问行为,默认情况下是乱序的 // 比如说,正常情况下,我们希望拿到的数据,是按照时间顺序排序的 // 但是问题是,默认是不排序的 // 所以,我们第一件事情,对session的访问行为数据按照时间进行排序 // 举例,反例 // 比如,3->5->4->10->7 // 3->4->5->7->10 // 排序 List<Row> rows = new ArrayList<Row>(); while (iterator.hasNext()){ rows.add(iterator.next()); } Collections.sort(rows, new Comparator<Row>() { @Override public int compare(Row row1, Row row2) { String actionTime1 = row1.getString(4); String actionTime2 = row2.getString(4); Date date1 = DateUtils.parseTime(actionTime1); Date date2 = DateUtils.parseTime(actionTime2); return (int)(date1.getTime() - date2.getTime()); } }); //页面切片的生成,以及页面留的匹配 Long lastPageId = null; for (Row row: rows){ long pageid = row.getLong(3); if (lastPageId == null){ lastPageId = pageid; continue; } // 生成一个页面切片 // 3,5,2,1,8,9 // lastPageId=3 // 5,切片,3_5 String pagePlit = lastPageId+"_"+pageid; // 对这个切片判断一下,是否在用户指定的页面流中 for (int i = 1; i < targetPages.length; i++){ // 比如说,用户指定的页面流是3,2,5,8,1 // 遍历的时候,从索引1开始,就是从第二个页面开始 // 3_2 String targetPageSplit = targetPages[i-1] +"_"+targetPages[i]; if (pagePlit.equals(targetPageSplit)){ list.add(new Tuple2<String,Integer>(pagePlit,1)); break; } } lastPageId = pageid; } return list.iterator(); } } ); } }
最后
以上就是寒冷果汁为你收集整理的92.Spark大型电商项目-页面单跳转化率-页面切片生成以及页面流匹配算法实现的全部内容,希望文章能够帮你解决92.Spark大型电商项目-页面单跳转化率-页面切片生成以及页面流匹配算法实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复