概述
-Spark实现热榜的统计
package com.changhong.laodixiao.HotCalculate;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public final class HotStatistics {
private static Logger logger = Logger.getLogger(HotStatistics.class);
public static void main(String[] args) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println(df.format(new Date()));
String EPGInfoDir = "/data/laodi.xiao/zzzHotCal/getEPG/data/";
String EPGInfoStarDir = "/data/laodi.xiao/zzzHotCal/getEPGStar/data/";
String UserActionDataDir = "/data/laodi.xiao/zzzHotCal/getUserAction/data/";
String OutputDir = "/data/laodi.xiao/zzzHotCal/output/data/";
if(args.length==4){
EPGInfoDir = args[0];
EPGInfoStarDir = args[1];
UserActionDataDir = args[2];
OutputDir = args[3];
}else {
System.err.println("Usage: $JAVA_HOME/java -jar HotCalcualte.jar EPGInfoDir EPGInfoStarDir UserActionDataDir OutputDir");
}
logger.info("<--------------------------------------------- Start Calculate Star Film List ...... ------------------------------->");
HotStatistics.getStarFilmListUsingSpark(EPGInfoStarDir, OutputDir);
logger.info("<--------------------------------------------- Star Film List Calculated ! ------------------------------->");
logger.info("<--------------------------------------------- Start Calculate Best PlayMedia Lists for One Day ...... ------------------------------->");
HotStatistics.getHotStatisticsForOneDayUsingSpark(EPGInfoDir, UserActionDataDir, OutputDir);
logger.info("<--------------------------------------------- Best PlayMedia Lists for One Day Calculated ! ------------------------------->");
System.out.println(df.format(new Date()));
}
public static void getStarFilmListUsingSpark(String ePGInfoStarDir, String outputDir) {
logger.info("<--------------------------------------------- Step One: Initializing the SparkContext ...... ------------------------------->");
SparkConf conf = new SparkConf().setMaster("local").setAppName("HotList statistics using Spark");
JavaSparkContext sc = new JavaSparkContext(conf);
logger.info("SparkContext initialized !");
logger.info("<--------------------------------------------- Step Two: Reading data from " +ePGInfoStarDir+ " ...... ------------------------------->");
// EPGInfoStar: star,cid,weight
JavaRDD<String> EPGInfoStarRDD = sc.textFile(ePGInfoStarDir+"EPGStarInfo.txt");
JavaPairRDD<String, Iterable<String>> starFilmList = EPGInfoStarRDD.distinct()
.mapToPair( //JavaRDD<String>转化为JavaPairRDD<Double,String> (weight, "star,cid,weight")
new PairFunction<String, Double, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Double, String> call(String arg0){
return new Tuple2<Double, String>(Double.parseDouble(arg0.split(",")[2]), arg0);
}
}
)
.sortByKey(false) //按照权重排序
.mapToPair(
new PairFunction<Tuple2<Double,String>, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(Tuple2<Double, String> arg0){
return new Tuple2<String, String>(arg0._2().split(",")[0], arg0._2().split(",")[1]+","+arg0._2().split(",")[2]);
}
}
)
.groupByKey()
.persist(StorageLevel.MEMORY_ONLY());
//输出统计结果
starFilmList.repartition(1).saveAsTextFile(outputDir+"starFilmList/");
sc.close();
}
public static void getHotStatisticsForOneDayUsingSpark(String ePGInfoDir, String userActionDataDir, String outputDir) {
logger.info("<--------------------------------------------- Step One: Initializing the SparkContext ...... ------------------------------->");
SparkConf conf = new SparkConf().setMaster("local").setAppName("HotList statistics using Spark");
JavaSparkContext sc = new JavaSparkContext(conf);
logger.info("SparkContext initialized !");
logger.info("<--------------------------------------------- Step Two: Reading data from ./data/* ...... ------------------------------->");
// EPGInfo: cid, player, categorys_name, model, vip
JavaRDD<String> EPGInfoRDD = sc.textFile(ePGInfoDir + "EPGInfo.txt");
// UserInfoTans: cid, mac, p_log_date
JavaRDD<String> UserInfoRDD = sc.textFile(userActionDataDir + "UserInfo01Day.txt");
logger.info("Data prepared !");
EPGInfoRDD.persist(StorageLevel.MEMORY_ONLY());
UserInfoRDD.persist(StorageLevel.MEMORY_ONLY());
//将UserInfo按照cid划分为key-value PairRDD
PairFunction<String, String, String> USerKeyDataFunction = new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String x){
String[] tmp = x.split(",", -1);
return new Tuple2<String, String>(tmp[0], tmp[1]+","+tmp[2]);
}
};
JavaPairRDD<String, String> userInfoPairRDD = UserInfoRDD
.filter(
new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String arg0)throws Exception {
return (!arg0.contains("GC") && arg0.length()>10);
}
}
)
.mapToPair(USerKeyDataFunction)
.distinct();
logger.info("<--------------------------------------------- Step Three: Get the statistics ...... ------------------------------->");
logger.info("tt<------------------------------------- SubStep Three-One: Get the 'model Best Visit List' statistics ...... ------------------------------->");
// EPGInfo: cid, player, categorys_name, model, vip
EPGInfoRDD
// UserInfoTans: cid, mac, p_log_date userInfoPairRDD
//tv的EPG数据
JavaRDD<String> tvEPGInfoRDD = EPGInfoRDD.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String arg0) throws Exception {
return arg0.contains("tv");
}
});
//movie的EPG数据
JavaRDD<String> movieEPGInfoRDD = EPGInfoRDD.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(String arg0) throws Exception {
return arg0.contains("movie");
}
});
JavaPairRDD<String, Iterable<String>> tvHotRdd = userInfoPairRDD //UserInfoTans: cid,
mac, p_log_date
.leftOuterJoin( //获取 [cid, (mac, p_log_date, categorys_name)]
tvEPGInfoRDD //EPGInfo: cid, player, categorys_name, model, vip
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String arg0)throws Exception {
String[] tmp = arg0.split(",", -1);
return new Tuple2<String, String>(tmp[0], tmp[2]);
}
}
)
)
.mapToPair( //获取(cid,categorys_name,1)
new PairFunction<Tuple2<String,Tuple2<String,Optional<String>>>, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Optional<String>>> arg0)throws Exception {
try{
return new Tuple2<String, Integer>(arg0._1+","+arg0._2._2.get(), 1);
}catch(Exception e){
return new Tuple2<String, Integer>("未知,未知", 1);
}
}
}
)
.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer arg0, Integer arg1)throws Exception {
return arg0+arg1;
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0)throws Exception {
return new Tuple2<Integer, String>(arg0._2, arg0._1);
}
}
)
.sortByKey(false) //(n, cid,categorys_name)
.mapToPair(
new PairFunction<Tuple2<Integer,String>, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(Tuple2<Integer, String> arg0)throws Exception {
String[] tmp = arg0._2.split(",", -1);
return new Tuple2<String, String>(tmp[1], tmp[0]+","+arg0._1.toString());
}
}
)
.groupByKey()
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ 电视剧热榜 ------------------------------------------>");
tvHotRdd.repartition(1).saveAsTextFile(outputDir + "oneDay/modelHot/tv");
JavaPairRDD<String, Iterable<String>> movieHotRdd = userInfoPairRDD //UserInfoTans: cid,
mac, p_log_date
.leftOuterJoin( //获取 [cid, (mac, p_log_date, categorys_name)]
movieEPGInfoRDD //EPGInfo: cid, player, categorys_name, model, vip
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String arg0)throws Exception {
String[] tmp = arg0.split(",", -1);
return new Tuple2<String, String>(tmp[0], tmp[2]);
}
}
)
)
.mapToPair( //获取(cid,categorys_name,1)
new PairFunction<Tuple2<String,Tuple2<String,Optional<String>>>, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(Tuple2<String, Tuple2<String, Optional<String>>> arg0)throws Exception {
try{
return new Tuple2<String, Integer>(arg0._1+","+arg0._2._2.get(), 1);
}catch(Exception e){
return new Tuple2<String, Integer>("未知,未知", 1);
}
}
}
)
.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer arg0, Integer arg1)throws Exception {
return arg0+arg1;
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0)throws Exception {
return new Tuple2<Integer, String>(arg0._2, arg0._1);
}
}
)
.sortByKey(false) //(n, cid,categorys_name)
.mapToPair(
new PairFunction<Tuple2<Integer,String>, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(Tuple2<Integer, String> arg0)throws Exception {
String[] tmp = arg0._2.split(",", -1);
return new Tuple2<String, String>(tmp[1], tmp[0]+","+arg0._1.toString());
}
}
)
.groupByKey()
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------- 电影热榜 ------------------------------------------->");
movieHotRdd.repartition(1).saveAsTextFile(outputDir + "oneDay/modelHot/movie");
logger.info("tt<------------------------------------- SubStep Three-Two: Get the 'VIP Best Visit List' statistics ...... ------------------------------->");
//recordVipPairRDD: cid,(mac, p_log_date, vip)
JavaPairRDD<String, Tuple2<String, Optional<String>>> recordVipPairRDD = userInfoPairRDD
.leftOuterJoin(
EPGInfoRDD
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String x){
return new Tuple2<String, String>(x.split(",", -1)[0], x.split(",", -1)[4]);
}
}
)
.distinct()
);
// cid,(mac, p_log_date, vip)
JavaPairRDD<Integer, String> vipHotRdd = recordVipPairRDD
.mapValues( //获取JavaPairRDD<String,String> (cid,vip)
new Function<Tuple2<String, Optional<String>>, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Tuple2<String, Optional<String>> value) {
try {
return Integer.parseInt(value._2().get());
} catch (Exception e) {
return Integer.parseInt("-1");
}
}
}
)
.filter( //获取VIP影片
new Function<Tuple2<String, Integer>, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(Tuple2<String, Integer> arg0) {
return (arg0._2()==1);
}
}
)
.mapToPair( //将JavaPairRDD<String, Integer> (cid, vip) 转换为JavaPairRDD<String, Integer> (cid, 1)
new PairFunction<Tuple2<String, Integer>, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(Tuple2<String, Integer> arg0) {
return new Tuple2<String, Integer>(arg0._1(), 1);
}
}
)
.reduceByKey( //JavaRDD<String> (cid, 1)按键累加
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer,String> call(Tuple2<String,Integer> modelHoTuple2){
return new Tuple2<Integer,String>(modelHoTuple2._2(),modelHoTuple2._1());
}
}
)
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ VIP热榜 ------------------------------------------>");
vipHotRdd.repartition(1).saveAsTextFile(outputDir + "oneDay/vipHot");
//
ListIterator<Tuple2<Integer, String>> vipHotRddRows = vipHotRdd.collect().listIterator();
//
while(vipHotRddRows.hasNext()){
//
Tuple2<Integer, String> dInteger = vipHotRddRows.next();
//
System.out.println(dInteger);
//
}
logger.info("tt<------------------------------------- SubStep Three-Three: Get the 'player Best Visit List' statistics ...... ------------------------------->");
//EPGInfo: cid, player, categorys_name, model, vip
//UserInfoTans: cid, mac, p_log_date
//recordPlayerPairRDD: cid,(mac, p_log_date, player)
JavaPairRDD<String, Tuple2<String, Optional<String>>> recordPlayerPairRDD = userInfoPairRDD
.leftOuterJoin(
EPGInfoRDD
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String x){
return new Tuple2<String, String>(x.split(",", -1)[0], x.split(",", -1)[1]);
}
}
)
.distinct()
);
// cid,(mac, p_log_date, player)
JavaPairRDD<String, Integer> playerHotRdd = recordPlayerPairRDD
.mapValues( //获取JavaPairRDD<String,String> (cid,player)
new Function<Tuple2<String, Optional<String>>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, Optional<String>> value) {
try {
return value._2().get();
} catch (Exception e) {
return "未知";
}
}
}
)
.map( //将JavaPairRDD转换为JavaRDD<String> (player, cid)
new Function<Tuple2<String,String>,String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> arg0) throws Exception {
return arg0._2()+ "," +arg0._1();
}
}
)
.mapToPair( //将JavaRDD<String>转换为JavaRDD<String> ((player, cid), 1)
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String x){
return new Tuple2<String, Integer>(x, 1);
}
}
)
.reduceByKey( //JavaRDD<String> ((player, cid), 1)按键(player, cid)累加
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
}
)
.persist(StorageLevel.MEMORY_ONLY());
//腾讯热榜
JavaPairRDD<Integer, String> tencentHotRdd = playerHotRdd
.filter(
new Function<Tuple2<String,Integer>, Boolean>() {
private static final long serialVersionUID = 1L;
public Boolean call(Tuple2<String,Integer> playerHot) throws Exception {
return playerHot._1().contains("tencent");
}
}
)
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer,String> call(Tuple2<String,Integer> playerHoTuple2){
return new Tuple2<Integer,String>(playerHoTuple2._2(),playerHoTuple2._1().split(",", -1)[1]);
}
}
)
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ 腾讯播放源热榜 ------------------------------------------>");
tencentHotRdd.repartition(1).saveAsTextFile(outputDir + "oneDay/playerHot/tencent");
//
ListIterator<Tuple2<Integer, String>> tencentHotRddRows = tencentHotRdd.collect().listIterator();
//
while(tencentHotRddRows.hasNext()){
//
Tuple2<Integer, String> dInteger = tencentHotRddRows.next();
//
System.out.println(dInteger);
//
}
logger.info("tt<------------------------------------- SubStep Three-Four: Get the 'tags Best Visit List' statistics ...... ------------------------------->");
//EPGInfo: cid, player, categorys_name, model, vip
//UserInfoTans: cid, mac, p_log_date
//recordTagsPairRDD: cid,(mac, p_log_date, tag)
JavaPairRDD<String, Tuple2<String, Optional<String>>> recordTagsPairRDD = userInfoPairRDD
.leftOuterJoin(
EPGInfoRDD
.mapToPair(
new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, String> call(String x){
return new Tuple2<String, String>(x.split(",", -1)[0], x.split(",", -1)[2]);
}
}
)
.distinct()
);
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> tagHotPairRDD = recordTagsPairRDD
.mapValues( //获取JavaPairRDD<String,String> (cid,tag)
new Function<Tuple2<String, Optional<String>>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, Optional<String>> value) {
try {
return value._2().get();
} catch (Exception e) {
return "未知";
}
}
}
)
.map( //将JavaPairRDD转换为JavaRDD<String> (tag, cid)
new Function<Tuple2<String,String>,String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> arg0) throws Exception {
return arg0._2()+ "," +arg0._1();
}
}
)
.mapToPair( //将JavaRDD<String>转换为JavaRDD<String> ((tag, cid), 1)
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String x){
return new Tuple2<String, Integer>(x, 1);
}
}
)
.reduceByKey( //JavaRDD<String> ((tag, cid), 1)按键(tag, cid)累加
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0+arg1;
}
}
)
.mapToPair( 将JavaRDD<Tuple2<String,String>,Integer>转换为JavaRDD<Integer,Tuple2<String,String>> (1, (cid,tag))
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> arg0){
return new Tuple2<Integer,String>(arg0._2(), arg0._1());
}
}
)
.sortByKey(false)
.mapToPair( //将JavaRDD<Tuple2<String,String>,Integer>转换为JavaRDD<String,Tuple2<String,Integer>> (tag, (cid,1))
new PairFunction<Tuple2<Integer,String>, String, Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Integer, String> arg0){
return new Tuple2<String, Tuple2<String, Integer>>
(arg0._2().split(",")[0], new Tuple2<String, Integer>(arg0._2.split(",")[1], arg0._1()));
}
}
)
.groupByKey() //按照tag进行聚合操作
.sortByKey(false)
.persist(StorageLevel.MEMORY_ONLY());
System.out.println("<------------------------------------ 各标签热榜 ------------------------------------------>");
tagHotPairRDD.repartition(1).saveAsTextFile(outputDir + "oneDay/tagsHot/");
sc.close();
}
}
最后
以上就是安静百合为你收集整理的Spark统计热榜(JAVA版)的全部内容,希望文章能够帮你解决Spark统计热榜(JAVA版)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复