复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97package day02 import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable /** * 统计每个学科最受欢迎的老师前N名 * *利用TreeSet进行数据优化 */ object SubjectAndTeacher04 { def main(args: Array[String]): Unit = { //定义分区的 val topN=2 //设置本地运行 val conf =new SparkConf().setAppName("SubjectAndTeacher04").setMaster("local[*]") //初始化对象 val sc =new SparkContext(conf) //从HDFS上读取数据 val lines = sc.textFile("hdfs://hadoop01:9000/sparkTest") val subjectAndTeacher = lines.map(line => { val url = new URL(line) val subject = url.getHost.substring(0, url.getHost.indexOf(".")) val teacher = url.getPath.substring(1) ((subject, teacher), 1) }) //收集到所有的学科数据 val subjects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect() //自定义一个分区 val subjectPartition = new SubjectPartition(subjects) //先进行局部聚合,再进行全局聚合 val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartition,_+_) //将新分区后的数据进行排序处理 reduced.foreachPartition(partition=>{ //自定义一个排序规则 val ts = new mutable.TreeSet[((String,String),Int)]()(new SubjectOrdering()) //分区遍历 partition.foreach(item=>{ ts.add(item) if(ts.size>topN){ ts.remove(ts.last) //ts = ts.dropRight(1) } }) //第二种方法 // while (partition.hasNext){ // ts.add(partition.next()) // if(ts.size > topN){ // ts = ts.dropRight(1) // } // } println(ts.toList.toBuffer) }) //输出数据 //释放资源 sc.stop() } } /** * 自定义排序规则 */ class SubjectOrdering extends Ordering[((String,String),Int)]{ override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = { //((学科,老师),票数) -(x._2.toInt - y._2.toInt) } } /** * 自定义一个分区 * @param subjects */ class SubjectPartition(subjects :Array[String]) extends Partitioner{ //初始化一个map,map中需要传递两个函数,一个是学科,一个是分区 val rules = new mutable.HashMap[String,Int]() //定义一个分区 var index =0 for (subject <- subjects){ rules += ((subject,index)) index += 1 } //定义分区的数量 override def numPartitions: Int = subjects.length //根据传入的key,返回对应的分区 override def getPartition(key: Any): Int = { //key是元组对象,里面装的是学科和老师 val tuple = key.asInstanceOf[Tuple2[String,String]] val subject = tuple._1 rules(subject) } }
java版本的
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108package day02; import jdk.nashorn.internal.objects.NativeArray; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.net.URL; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.TreeSet; /** * 统计每个学科最受欢迎的老师前N名 * *利用TreeSet进行数据优化 */ public class SubjectAndTeacherDemo { public static void main(String[] args) { //创建本地模式 SparkConf conf = new SparkConf().setAppName("SubjectAndTeacherDemo").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); //读取数据 JavaRDD<String> lines = sc.textFile("hdfs://hadoop01:9000/sparkTest"); //对数据进行处理 JavaPairRDD<Tuple2<String, String>, Integer> subjectAndTeacher = lines.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() { @Override public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception { URL url = new URL(s); String subject = url.getHost().substring(0, url.getHost().indexOf(".")); String teacher = url.getPath().substring(1); return new Tuple2<>(new Tuple2<String, String>(subject, teacher), 1); } }); //收集到所有的分区数据,学科 JavaPairRDD<String, String> subjects = subjectAndTeacher.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, String>() { @Override public Tuple2<String, String> call(Tuple2<Tuple2<String, String>, Integer> tp) throws Exception { return tp._1; } }); //学科转成数组 Object[] subjectArr = subjects.collectAsMap().keySet().toArray(); //自定义一个分区 MyPartitioner2 partitioner= new MyPartitioner2(subjectArr); //分区内聚合 JavaPairRDD<Tuple2<String, String>, Integer> reduced = subjectAndTeacher.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); //分区内排序 reduced.foreachPartition(new VoidFunction<Iterator<Tuple2<Tuple2<String, String>, Integer>>>() { @Override public void call(Iterator<Tuple2<Tuple2<String, String>, Integer>> tuple2Iterator) throws Exception { TreeSet<Tuple2<Tuple2<String, String>, Integer>> set= new TreeSet<>(new Comparator<Tuple2<Tuple2<String, String>, Integer>>() { @Override public int compare(Tuple2<Tuple2<String, String>, Integer> o1, Tuple2<Tuple2<String, String>, Integer> o2) { return o2._2-o1._2; } }); while(tuple2Iterator.hasNext()){ set.add(tuple2Iterator.next()); if(set.size()>2){ set.remove(set.last()); } } Iterator<Tuple2<Tuple2<String, String>, Integer>> iterator = set.iterator(); while(iterator.hasNext()){ System.out.println(iterator.next()); //关闭资源 sc.stop(); } } }); } } class MyPartitioner2 extends Partitioner{ Object[] subjectArr; HashMap<String,Integer> map =new HashMap<String,Integer>(); public MyPartitioner2(){} public MyPartitioner2( Object[] subjectArr){ this.subjectArr=subjectArr; String subject =null; int index =0; for (Object obj :subjectArr){ subject=obj.toString(); map.put(subject,index); index +=1; } } @Override public int numPartitions() { return subjectArr.length; } @Override public int getPartition(Object key) { Tuple2<String, String> tuple2=(Tuple2<String, String>)key; String subject= tuple2._1; return map.get(subject); } }
最后
以上就是笨笨寒风最近收集整理的关于统计每个学科最受欢迎的老师前N名的全部内容,更多相关统计每个学科最受欢迎内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复