我是靠谱客的博主 笨笨寒风,最近开发中收集的这篇文章主要介绍统计每个学科最受欢迎的老师前N名,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

package day02
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* 统计每个学科最受欢迎的老师前N名
*
*利用TreeSet进行数据优化
*/
object SubjectAndTeacher04 {
def main(args: Array[String]): Unit = {
//定义分区的
val topN=2
//设置本地运行
val conf =new SparkConf().setAppName("SubjectAndTeacher04").setMaster("local[*]")
//初始化对象
val sc =new SparkContext(conf)
//从HDFS上读取数据
val lines = sc.textFile("hdfs://hadoop01:9000/sparkTest")
val subjectAndTeacher = lines.map(line => {
val url = new URL(line)
val subject = url.getHost.substring(0, url.getHost.indexOf("."))
val teacher = url.getPath.substring(1)
((subject, teacher), 1)
})
//收集到所有的学科数据
val subjects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()
//自定义一个分区
val subjectPartition = new SubjectPartition(subjects)
//先进行局部聚合,再进行全局聚合
val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartition,_+_)
//将新分区后的数据进行排序处理
reduced.foreachPartition(partition=>{
//自定义一个排序规则
val ts = new mutable.TreeSet[((String,String),Int)]()(new SubjectOrdering())
//分区遍历
partition.foreach(item=>{
ts.add(item)
if(ts.size>topN){
ts.remove(ts.last)
//ts = ts.dropRight(1)
}
})
//第二种方法
//
while (partition.hasNext){
//
ts.add(partition.next())
//
if(ts.size > topN){
//
ts = ts.dropRight(1)
//
}
//
}
println(ts.toList.toBuffer)
})
//输出数据
//释放资源
sc.stop()
}
}
/**
* 自定义排序规则
*/
class SubjectOrdering extends Ordering[((String,String),Int)]{
override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
//((学科,老师),票数)
-(x._2.toInt - y._2.toInt)
}
}
/**
* 自定义一个分区
* @param subjects
*/
class SubjectPartition(subjects :Array[String]) extends Partitioner{
//初始化一个map,map中需要传递两个函数,一个是学科,一个是分区
val rules = new mutable.HashMap[String,Int]()
//定义一个分区
var index =0
for (subject <- subjects){
rules += ((subject,index))
index += 1
}
//定义分区的数量
override def numPartitions: Int = subjects.length
//根据传入的key,返回对应的分区
override def getPartition(key: Any): Int = {
//key是元组对象,里面装的是学科和老师
val tuple = key.asInstanceOf[Tuple2[String,String]]
val subject = tuple._1
rules(subject)
}
}

java版本的

package day02;
import jdk.nashorn.internal.objects.NativeArray;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.net.URL;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeSet;
/**
* 统计每个学科最受欢迎的老师前N名
*
*利用TreeSet进行数据优化
*/
public class SubjectAndTeacherDemo {
public static void main(String[] args) {
//创建本地模式
SparkConf conf = new SparkConf().setAppName("SubjectAndTeacherDemo").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
//读取数据
JavaRDD<String> lines = sc.textFile("hdfs://hadoop01:9000/sparkTest");
//对数据进行处理
JavaPairRDD<Tuple2<String, String>, Integer> subjectAndTeacher = lines.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() {
@Override
public Tuple2<Tuple2<String, String>, Integer> call(String s) throws Exception {
URL url = new URL(s);
String subject = url.getHost().substring(0, url.getHost().indexOf("."));
String teacher = url.getPath().substring(1);
return new Tuple2<>(new Tuple2<String, String>(subject, teacher), 1);
}
});
//收集到所有的分区数据,学科
JavaPairRDD<String, String> subjects = subjectAndTeacher.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<Tuple2<String, String>, Integer> tp) throws Exception {
return tp._1;
}
});
//学科转成数组
Object[] subjectArr = subjects.collectAsMap().keySet().toArray();
//自定义一个分区
MyPartitioner2 partitioner= new MyPartitioner2(subjectArr);
//分区内聚合
JavaPairRDD<Tuple2<String, String>, Integer> reduced = subjectAndTeacher.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//分区内排序
reduced.foreachPartition(new VoidFunction<Iterator<Tuple2<Tuple2<String, String>, Integer>>>() {
@Override
public void call(Iterator<Tuple2<Tuple2<String, String>, Integer>> tuple2Iterator) throws Exception {
TreeSet<Tuple2<Tuple2<String, String>, Integer>> set=
new TreeSet<>(new Comparator<Tuple2<Tuple2<String, String>, Integer>>() {
@Override
public int compare(Tuple2<Tuple2<String, String>, Integer> o1, Tuple2<Tuple2<String, String>, Integer> o2) {
return o2._2-o1._2;
}
});
while(tuple2Iterator.hasNext()){
set.add(tuple2Iterator.next());
if(set.size()>2){
set.remove(set.last());
}
}
Iterator<Tuple2<Tuple2<String, String>, Integer>> iterator = set.iterator();
while(iterator.hasNext()){
System.out.println(iterator.next());
//关闭资源
sc.stop();
}
}
});
}
}
class MyPartitioner2 extends Partitioner{
Object[] subjectArr;
HashMap<String,Integer> map =new HashMap<String,Integer>();
public MyPartitioner2(){}
public MyPartitioner2( Object[] subjectArr){
this.subjectArr=subjectArr;
String subject =null;
int index =0;
for (Object obj :subjectArr){
subject=obj.toString();
map.put(subject,index);
index +=1;
}
}
@Override
public int numPartitions() {
return subjectArr.length;
}
@Override
public int getPartition(Object key) {
Tuple2<String, String> tuple2=(Tuple2<String, String>)key;
String subject= tuple2._1;
return map.get(subject);
}
}

 

 

最后

以上就是笨笨寒风为你收集整理的统计每个学科最受欢迎的老师前N名的全部内容,希望文章能够帮你解决统计每个学科最受欢迎的老师前N名所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部