概述
数据
http://bigdata.51doit.cn/laozhang
http://bigdata.51doit.cn/laozhang
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laoduan
http://bigdata.51doit.cn/laoduan
http://javaee.51doit.cn/xiaoxu
http://javaee.51doit.cn/xiaoxu
http://javaee.51doit.cn/laoyang
http://javaee.51doit.cn/laoyang
http://javaee.51doit.cn/laoyang
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laoduan
http://bigdata.51doit.cn/laoduan
http://javaee.51doit.cn/xiaoxu
http://javaee.51doit.cn/xiaoxu
http://javaee.51doit.cn/laoyang
http://javaee.51doit.cn/laoyang
http://javaee.51doit.cn/laoyang
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laozhao
http://bigdata.51doit.cn/laoduan
http://bigdata.51doit.cn/laoduan
http://javaee.51doit.cn/xiaoxu
http://javaee.51doit.cn/xiaoxu
http://javaee.51doit.cn/laoyang
http://javaee.51doit.cn/laoyang
http://javaee.51doit.cn/laoyang
http://php.51doit.cn/laoli
http://php.51doit.cn/laoliu
http://php.51doit.cn/laoli
http://php.51doit.cn/laoli
......
要求
求出每个学科最受欢迎的topN个老师
实现方式一:基础实现
package com.doit.spark.day01
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object TeacherTopN1 {
def main(args: Array[String]): Unit = {
//传入是否在本地运行
val setMaster = args(0).toBoolean
//传入求的每个学科的受欢迎的老师的数量
val topN = args(1).toInt
val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")
//判断是否在本地运行
if (setMaster){
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//传入文件路径
//"D:\新建文件夹\spark-day05\资料\计算每个学科最受欢迎Top2.log"
val filePath = args(2)
val line: RDD[String] = sc.textFile(filePath)
val reduced: RDD[((String, String), Int)] = line.map(x => {
//切割得到(subject,teacher)
val arr1: Array[String] = x.split("/+")
val arr2: Array[String] = arr1(1).split("[.]")
val subject: String = arr2(0)
val teacher: String = arr1(2)
((subject, teacher), 1)
//聚合每个学科每个老师的访问数量
}).reduceByKey(_ + _)
//按学科进行分组,得到每个学科下,所以的访问数量,放入迭代器中
val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
//调用mapValues对每个学科中的迭代器进行处理
val sorted: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(it => {
//将迭代器转成List调用List中的排序方法对区内数据进行降序排序,并取前topN个
//但使用此方法会有一个问题,就是当某个学科的数量特别多的时候,toList会将数据全部放入内存中
//这样可能导致内存溢出,解决方法在TeacherTopN2中给出
val list: List[((String, String), Int)] = it.toList.sortBy(-_._2).take(topN)
list
})
println(sorted.collect().toBuffer)
}
}
实现方式二:按学科过滤
package com.doit.spark.day01
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import scala.math.Ordering
object TeacherTopN2 {
def main(args: Array[String]): Unit = {
//传入是否在本地运行
val setMaster = args(0).toBoolean
//传入求的每个学科的受欢迎的老师的数量
val topN = args(1).toInt
val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")
//判断是否在本地运行
if (setMaster){
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//传入文件路径
//"D:\新建文件夹\spark-day05\资料\计算每个学科最受欢迎Top2.log"
val filePath = args(2)
val line: RDD[String] = sc.textFile(filePath)
val reduced: RDD[((String, String), Int)] = line.map(x => {
//切割得到(subject,teacher)
val arr1: Array[String] = x.split("/+")
val arr2: Array[String] = arr1(1).split("[.]")
val subject: String = arr2(0)
val teacher: String = arr1(2)
((subject, teacher), 1)
//聚合每个学科每个老师的访问数量
}).reduceByKey(_ + _)
//去重拿到所有的学科
val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
//遍历subjects
for (elem <- subjects) {
//筛选出每个学科中所以的数据,老师进行降序排序,
// 这种方式会使整个程序,有多少个学科就触发几次action,但是调用了RDD的sortBy算子进行排序,可以避免内存溢出的风险
println(reduced.filter(x => x._1._1.equals(elem)).sortBy(_._1._2, false).take(topN).toBuffer)
}
//还可以直接调用takeOrdered算子,在该算子内部它会自动排序,并取出topN,可以减少一步shuffle的过程,但是需要我们自定义排序规则
for (elem <- subjects) {
//通过new Ordering的匿名内部类自定义排序规则
//
reduced.filter(x => x._1._1.equals(elem)).takeOrdered(topN)(new Ordering[((String, String), Int)] {
//
override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int =
//
-(x._2 - y._2)
//
})
//通过隐式转换自定义Ordering的排序规则
implicit val ord: Ordering[((String, String), Int)] = Ordering[Int].on[((String,String),Int)](t => -t._2)
println(reduced.filter(x => x._1._1.equals(elem)).takeOrdered(topN).toBuffer)
}
}
}
实现方式三:自定义分区,TeeeSet排序
package com.doit.spark.day01
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object TeacherTopN3 {
def main(args: Array[String]): Unit = {
//传入是否在本地运行
val setMaster = args(0).toBoolean
//传入求的每个学科的受欢迎的老师的数量
val topN = args(1).toInt
val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")
//判断是否在本地运行
if (setMaster){
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//传入文件路径
//"D:\新建文件夹\spark-day05\资料\计算每个学科最受欢迎Top2.log"
val filePath = args(2)
val line: RDD[String] = sc.textFile(filePath)
val reduced: RDD[((String, String), Int)] = line.map(x => {
//切割得到(subject,teacher)
val arr1: Array[String] = x.split("/+")
val arr2: Array[String] = arr1(1).split("[.]")
val subject: String = arr2(0)
val teacher: String = arr1(2)
((subject, teacher), 1)
//聚合每个学科每个老师的访问数量
}).reduceByKey(_ + _)
//获取所有的subjects,传入到自定义的分区器中
val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
val subjectPartition = new SubjectPartition(subjects)
//将每个学科单独的分到一个区里面,然后就能对每个区进行单独操作
//partitionBy里面需要传入一个分区器
val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(subjectPartition)
//调用mapPartition,对每个区内的数据进行处理
val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions((it: Iterator[((String, String), Int)]) => {
//排序的时候,我们还可以使用TreeSet集合,筛选出我们想要的数据
//该集合的长度设置为topN+1,这样使用迭代器时,每装进来一个,再移除一个不符合最低的,就能取到我们想要的数据
//但是我们的数据是一个元组类型的数据,TreeSet排序时,会先看元组中的第一个元素,再看元组中的第二个元素按字典顺序进行排序
//所以我们需要使用隐式转换将我们自定义的排序规则传进去,这样它就会按照我们自己定义的排序规则进行排序
implicit val ord = Ordering[Int].on[((String, String), Int)](t => -t._2)
val treeSet = new mutable.TreeSet[((String, String), Int)]()
var i = 0
for (elem <- it) {
treeSet.add(elem)
//判断treeSet的长度大于topN了,说明它里面的元素比topN多了一个,那么我们需要移除最后一个,因为我们定义的是降序排序
if (treeSet.size > topN) {
treeSet.remove(treeSet.last)
}
}
//最后返回treeSet转成的迭代器,mapPartition算子需要返回一个迭代器
treeSet.iterator
})
println(sorted.collect().toBuffer)
}
}
//创建一个自定义的分区器,继承Partitioner
class SubjectPartition(subjects: Array[String]) extends Partitioner{
//重写方法,需要传入分区数量
override def numPartitions: Int = subjects.size
//new一个可变的HashMap,用来装学科名和分区编号
private val hashMap = new mutable.HashMap[String, Int]()
var index = 0
for (elem <- subjects) {
//将学科名和分区编号放入hashMap中,分区编号是递增的
hashMap(elem)=index
index += 1
}
//需要传入分区编号
override def getPartition(key: Any): Int = {
//因为方法中的key是Any类型的,需要将他转成(学科,老师)元组类型,然后取出其中的学科
val subject: String = key.asInstanceOf[(String, String)]._1
//返回该学科所对应的分区编号
hashMap(subject)
}
}
实现方式四:减少shuffle
package com.doit.spark.day01
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object TeacherTopN4 {
def main(args: Array[String]): Unit = {
//传入是否在本地运行
val setMaster = args(0).toBoolean
//传入求的每个学科的受欢迎的老师的数量
val topN = args(1).toInt
val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")
//判断是否在本地运行
if (setMaster){
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//传入文件路径
//"D:\新建文件夹\spark-day05\资料\计算每个学科最受欢迎Top2.log"
val filePath = args(2)
val line: RDD[String] = sc.textFile(filePath)
val rdd1: RDD[((String, String), Int)] = line.map(x => {
//切割得到(subject,teacher)
val arr1: Array[String] = x.split("/+")
val arr2: Array[String] = arr1(1).split("[.]")
val subject: String = arr2(0)
val teacher: String = arr1(2)
((subject, teacher), 1)
})
val subjects: Array[String] = rdd1.map(_._1._1).distinct().collect()
val subjectPartition = new SubjectPartition2(subjects)
//reduceByKey可以传入自定义分区器,然后在聚合并shuffle时按我们自定义的分区器进行shuffle分区并聚合
//省去了一步partitionBy,减少了一步shuffleg的过程
val rdd2: RDD[((String, String), Int)] = rdd1.reduceByKey(subjectPartition, _ + _)
val sorted = rdd2.mapPartitions((it: Iterator[((String, String), Int)]) => {
//排序的时候,我们还可以使用TreeSet集合,筛选出我们想要的数据
//该集合的长度设置为topN+1,这样使用迭代器时,每装进来一个,再移除一个不符合最低的,就能取到我们想要的数据
//但是我们的数据是一个元组类型的数据,TreeSet排序时,会先看元组中的第一个元素,再看元组中的第二个元素按字典顺序进行排序
//所以我们需要使用隐式转换将我们自定义的排序规则传进去,这样它就会按照我们自己定义的排序规则进行排序
implicit val ord = Ordering[Int].on[((String, String), Int)](t => -t._2)
val treeSet = new mutable.TreeSet[((String, String), Int)]()
var i = 0
for (elem <- it) {
treeSet.add(elem)
//判断treeSet的长度大于topN了,说明它里面的元素比topN多了一个,那么我们需要移除最后一个,因为我们定义的是降序排序
if (treeSet.size > topN) {
treeSet.remove(treeSet.last)
}
}
//最后返回treeSet转成的迭代器,mapPartition算子需要返回一个迭代器
treeSet.iterator
})
println(sorted.collect().toBuffer)
}
}
//创建一个自定义的分区器,继承Partitioner
class SubjectPartition2(subjects: Array[String]) extends Partitioner{
//重写方法,需要传入分区数量
override def numPartitions: Int = subjects.size
//new一个可变的HashMap,用来装学科名和分区编号
private val hashMap = new mutable.HashMap[String, Int]()
var index = 0
for (elem <- subjects) {
//将学科名和分区编号放入hashMap中,分区编号是递增的
hashMap(elem)=index
index += 1
}
//需要传入分区编号
override def getPartition(key: Any): Int = {
//因为方法中的key是Any类型的,需要将他转成(学科,老师)元组类型,然后取出其中的学科
val subject: String = key.asInstanceOf[(String, String)]._1
//返回该学科所对应的分区编号
hashMap(subject)
}
}
实现方式五:使用repartitionAndSortWithinPartitions
这个算子传入一个自定义分区器,会自动按分区标准进行分区,并在区内按key进行排序,也可以用隐式参数指定key排序的规则,但只能按key排序,因为它底层调用的是setKeyOrderiing
package com.doit.spark.day01
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object TeacherTopN5 {
def main(args: Array[String]): Unit = {
//传入是否在本地运行
val setMaster = args(0).toBoolean
//传入求的每个学科的受欢迎的老师的数量
val topN = args(1).toInt
val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")
//判断是否在本地运行
if (setMaster){
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//传入文件路径
//"D:\新建文件夹\spark-day05\资料\计算每个学科最受欢迎Top2.log"
val filePath = args(2)
val line: RDD[String] = sc.textFile(filePath)
val rdd1: RDD[((String, String), Int)] = line.map(x => {
//切割得到(subject,teacher)
val arr1: Array[String] = x.split("/+")
val arr2: Array[String] = arr1(1).split("[.]")
val subject: String = arr2(0)
val teacher: String = arr1(2)
((subject, teacher), 1)
//聚合每个学科每个老师的访问数量
}).reduceByKey(_+_)
//获取所有的学科,放入数组中
val array: Array[String] = rdd1.map(_._1._1).distinct().collect()
//调用自定义的分区器,传入学科的数组
val subjectPartition = new SubjectPartition3(array)
//使用隐式参数,自定义排序规则,因为这个算子只能用key排序,且数据得是k,v类型得,所以需要将聚合的数量放入key中,null作为v
val rdd2: RDD[((String, String, Int), Null)] = rdd1.map(x => {
((x._1._1, x._1._2, x._2), null)
})
//自定义排序规则
implicit val ord = Ordering[Int].on[(String, String, Int)](t => -t._3)
//传入自定义分区器的对象,使它将每个学科作为一个区,并在区内排序
val pastitonedAndSorted: RDD[(String, String, Int)] = rdd2.repartitionAndSortWithinPartitions(subjectPartition).map(_._1)
//在学科区内区前两名老师即可
val value: RDD[(String, String, Int)] = pastitonedAndSorted.mapPartitions(it => {
val tuples: Iterator[(String, String, Int)] = it.take(2)
tuples
})
println(value.collect().toBuffer)
}
}
//创建一个自定义的分区器,继承Partitioner
class SubjectPartition3(subjects: Array[String]) extends Partitioner{
//重写方法,需要传入分区数量
override def numPartitions: Int = subjects.size
//new一个可变的HashMap,用来装学科名和分区编号
private val hashMap = new mutable.HashMap[String, Int]()
var index = 0
for (elem <- subjects) {
//将学科名和分区编号放入hashMap中,分区编号是递增的
hashMap(elem)=index
index += 1
}
//需要传入分区编号
override def getPartition(key: Any): Int = {
//因为方法中的key是Any类型的,需要将他转成(学科,老师)元组类型,然后取出其中的学科
val subject: String = key.asInstanceOf[(String, String, Int)]._1
//返回该学科所对应的分区编号
hashMap(subject)
}
}
实现方式六:使用ShuffleRDD进行分区并排序
package com.doit.spark.day01
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
object TeacherTopN6 {
def main(args: Array[String]): Unit = {
//传入是否在本地运行
val setMaster = args(0).toBoolean
//传入求的每个学科的受欢迎的老师的数量
val topN = args(1).toInt
val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")
//判断是否在本地运行
if (setMaster) {
conf.setMaster("local[*]")
}
val sc = new SparkContext(conf)
//传入文件路径
//"D:\新建文件夹\spark-day05\资料\计算每个学科最受欢迎Top2.log"
val filePath = args(2)
val line: RDD[String] = sc.textFile(filePath)
val rdd1: RDD[((String, String), Int)] = line.map(x => {
//切割得到(subject,teacher)
val arr1: Array[String] = x.split("/+")
val arr2: Array[String] = arr1(1).split("[.]")
val subject: String = arr2(0)
val teacher: String = arr1(2)
((subject, teacher), 1)
//聚合每个学科每个老师的访问数量
}).reduceByKey(_ + _)
//获取所有的学科,放入数组中
val array: Array[String] = rdd1.map(_._1._1).distinct().collect()
//调用自定义的分区器,传入学科的数组
val subjectPartition = new SubjectPartition4(array)
//使用隐式参数,自定义排序规则,因为这个算子只能用key排序,且数据得是k,v类型得,所以需要将聚合的数量放入key中,null作为v
val rdd2: RDD[((String, String, Int), Null)] = rdd1.map(x => {
((x._1._1, x._1._2, x._2), null)
})
//自定义排序规则
val ord = Ordering[Int].on[(String, String, Int)](t => -t._3)
//new ShuffledRDD它也能帮我们自定义分区,并在区内自动排序,因为repartitionAndSortWithinPartitions底层就是调用的ShuffledRDD
// 定义三个参数类型代表得含义为:key的类型,value的类型,和聚合后value的类型
//括号内的参数为:要处理的RDD,和自定义的分区器,后面还要.setKeyOrdering设定排序规则,需要手动传入,因为它没有隐式参数
val shuffled: ShuffledRDD[(String, String, Int), Null, Null] = new ShuffledRDD[(String, String, Int), Null, Null](rdd2, subjectPartition).setKeyOrdering(ord)
val pastitonedAndSorted: RDD[(String, String, Int)] = shuffled.map(_._1)
val value: RDD[(String, String, Int)] = pastitonedAndSorted.mapPartitions(it => {
val tuples: Iterator[(String, String, Int)] = it.take(2)
tuples
})
println(value.collect().toBuffer)
}
}
//创建一个自定义的分区器,继承Partitioner
class SubjectPartition4(subjects: Array[String]) extends Partitioner{
//重写方法,需要传入分区数量
override def numPartitions: Int = subjects.size
//new一个可变的HashMap,用来装学科名和分区编号
private val hashMap = new mutable.HashMap[String, Int]()
var index = 0
for (elem <- subjects) {
//将学科名和分区编号放入hashMap中,分区编号是递增的
hashMap(elem)=index
index += 1
}
//需要传入分区编号
override def getPartition(key: Any): Int = {
//因为方法中的key是Any类型的,需要将他转成(学科,老师)元组类型,然后取出其中的学科
val subject: String = key.asInstanceOf[(String, String, Int)]._1
//返回该学科所对应的分区编号
hashMap(subject)
}
}
最后
以上就是清爽芒果为你收集整理的大数据之spark_每个学科最受欢迎的老师案例的全部内容,希望文章能够帮你解决大数据之spark_每个学科最受欢迎的老师案例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复