Spark小练习——求各科老师最受欢迎的TopN
【注】本文参考自小牛学堂学习视频
Spark小练习——求各科老师最受欢迎的TopN
数据格式:http://bigdata.edu360.cn/laozhang
1.数据切分
复制代码
1
2
3
4
5
6
7
8val func=(line:String)=>{ val index=line.lastIndexOf("/") val teacher=line.substring(index+1) val httpHost=line.substring(0,index) val subject=new URL(httpHost).getHost.split("[.]")(0) // (subject,teacher) //(teacher,1) }
2.逻辑计算
2.1求所有科目中最受欢迎的老师topN
复制代码
1
2
3
4
5
6
7//拿到数据源 val lines=sc.textFile(path) val teacherAndOne=lines.map(func) val reduced=teacherAndOne.reduceByKey(_+_) val sorted=reduced.sortBy(_._2,false) val result=sorted.top(topN))
2.2求各科最受欢迎老师的topN
(1)使用Scala中的sortBy方法(适用于数据量较小的情况)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13val lines=sc.textFile(path) val subjectAndTeacher=lines.map(func) val maped=subjectAndTeacher.map((_,1)) val reduced=maped.reduceByKey(_+_) //按学科分组,得到的key是学科,value是学科对应的老师数据的迭代器 val grouped: RDD[(String, Iterable[((String, String), Int)])]=reduced.groupBy(_._1._1) //将每一个组拿出来进行操作 //为什么可以调用scala的sortBy方法 //因为一个学科的数据已经在一台机器上的一个集合里了(缺点:在内存中序, //如果数据量大的话,可能会出问题) val sorted=grouped.mapValues(_.toList.sortBy(_._2).reverse.take(topN)) //数据量较小,所以就直接收集了,也可以把它存储到文件中 val result=sorted.collect()
(2)使用RDD中的sortBy方法(适用于数据量较大的情况)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13val lines=sc.textFile(path) val subjectAndTeacher=lines.map(func) val subjects=subjectAndTeacher.keys.distinct() val maped=subjectAndTeacher.map((_,1)) val reduced=maped.reduceByKey(_+_) for(sb <- subjects){ val filter=reduce.filter(_._1.equals(sb)) //现在调用的是RDD上的sortBy方法,可在内存与磁盘中 //take方法是先在Executor中取好前几个再通过网络发送到Driver,是个 //Action val r=filter.sortBy(_._2,false).take(topN) //然后把r收集或存储起来 }
(3)自定义分区器,以学科来分区
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21i.分区器SubjectPartitioner class SubjectPartitioner(subjects:Array[String]) extends Partitioner{ //相当于主构造器(new的时候会执行一次) //用于存放规则的一个map val rules=new mutable.HashMap[String,Int]() var i=0 for(sb <- subjects){ rules(sb)=i i=i+1 } //返回分区的数量(下一个RDD有多少分区) override def numPartitions:Int =subjects.length //根据传入的key计算分区标号 override def getPartition(key: Any):Int ={ //key是一个元组(学科,老师) //sb:学科 val sb=key.asInstanceOf[(String,String)]._1 //根据规则计算分区编号 rules(sb) } }
ii.逻辑
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18val lines=sc.textFile(path) val subjectAndTeacher=lines.map(func) val subjects=subjectAndTeacher.keys.distinct() val maped=subjectAndTeacher.map((_,1)) //聚合 //第一次shuffle val reduced=maped.reduceByKey(_+_) //自定义分区器,按照指定的分区器来进行分区 //partitionBy按照指定的分区规则来分区 //第二次shuffle val partitioned: RDD[((String, String), Int)] =reduce.partitionBy(new SubjectPartitioner(subjects)) //一次操作一个分区 val sorted=partitioned.mapPartitions(it => { //将迭代器转换成List然后排序再转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator //缺点:又是加载到内存再排序 })
(4)减少shuffle次数
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17val lines=sc.textFile(path) val subjectAndTeacher=lines.map(func) val subjects=subjectAndTeacher.keys.distinct() val maped=subjectAndTeacher.map((_,1)) //分区器 val sbPartitioner=new SubjectPartitioner(subjects) //聚合 //第一次shuffle val reduced=maped.reduceByKey(sbPartitioner,_+_) //一次操作一个分区 val sorted=reduced.mapPartitions(it => { //将迭代器转换成List然后排序再转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator //缺点:又是加载到内存再排序 //优化:即排序,又不全部加载到内存 //考虑用一个定长TreeSet来装从迭代器中取出的数据,然后排序留下topN,后面的再装入新的数据,再排序,重复操作直到迭代完该学科的所有数据 })
最后
以上就是活泼镜子最近收集整理的关于Spark小练习——求各科老师最受欢迎的TopNSpark小练习——求各科老师最受欢迎的TopN的全部内容,更多相关Spark小练习——求各科老师最受欢迎内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复