概述
第一种方法:笨重方法 通过groupby 然后排序
object _02_FavTeacherTopn {
def main(args: Array[String]): Unit = {
val sc: SparkContext = SparkUtils.getSparkContext
val rdd1: RDD[String] = sc.textFile("data\anli\fav_teacher\teacher.dat")
//http://spark.doit.cn/laozhao
val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
// ArrayBuffer(http:, , spark.doit.cn, laozhang)
val arr: Array[String] = line.split("/")
val arr2 = arr(2).split("[.]")
((arr2(0),arr(3)),1)
})
//对key先进行聚合
这样可以使数据量大幅度的减少
val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)
val rdd4: RDD[(String, Iterable[((String, String), Int)])] = rdd3.groupBy(tp => {
tp._1._1
})
val rdd5: RDD[List[((String, String), Int)]] = rdd4.map(tp => {
//tolist操作
如果一个学科下面有很多的老师
会造成内存溢出
val tuples: List[((String, String), Int)] = tp._2.toList.sortBy(tp => {
-tp._2
})
tuples
})
val rdd6: RDD[List[((String, String), Int)]] = rdd5.map(line => {
//(line(0), line(1))
val tuples = line.take(3)
tuples
})
rdd6.foreach(println(_))
sc.stop()
}
}
第二种方法:聚合之后 先进行过滤,得到每一组的数值,然后利用takeOrdered取出topn
object _03_FavTeacherTopn {
def main(args: Array[String]): Unit = {
val sc: SparkContext = SparkUtils.getSparkContext
val rdd1: RDD[String] = sc.textFile("data\anli\fav_teacher\teacher.dat")
//http://spark.doit.cn/laozhao
val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
// ArrayBuffer(http:, , spark.doit.cn, laozhang)
val arr: Array[String] = line.split("/")
val arr2 = arr(2).split("[.]")
((arr2(0),arr(3)),1)
})
//对key先进行聚合
这样可以使数据量大幅度的减少
val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)
//首先,得到总的科目
val array: Array[String] = rdd3.map(_._1._1).distinct().collect()
//传入自定义的排序规则
/* implicit
def
myOrdering = {
new Ordering[((String, String), Int)] {
override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
- x._2 + y._2
}
}
}*/
implicit val ord: Ordering[((String, String), Int)] = Ordering[Int].on[((String, String), Int)](t => -t._2)
//进行缓存,方便多次进行复用
rdd3.persist(StorageLevel.MEMORY_AND_DISK_SER)
for (elem <- array) {
val rdd4: RDD[((String, String), Int)] = rdd3.filter(tp => {
tp._1._1 == elem
})
val rdd5: Array[((String, String), Int)] = rdd4.takeOrdered(3)
rdd5.foreach(println(_))
println("**********************")
}
//println(array.toBuffer)
sc.stop()
}
}
第三种方法:聚合之后 进行分组 与第一种方法的不同之处在于 迭代器不转换为集合后再进行排序,而是利用treeSet集合,能够自动排序
object _04_FavTeacherTopn {
def main(args: Array[String]): Unit = {
val sc: SparkContext = SparkUtils.getSparkContext
val rdd1: RDD[String] = sc.textFile("data\anli\fav_teacher\teacher.dat")
//http://spark.doit.cn/laozhao
val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
// ArrayBuffer(http:, , spark.doit.cn, laozhang)
val arr: Array[String] = line.split("/")
val arr2 = arr(2).split("[.]")
((arr2(0),arr(3)),1)
})
//对key先进行聚合
这样可以使数据量大幅度的减少
val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)
implicit val ord: Ordering[((String, String), Int)] = Ordering[Int].on[((String, String), Int)](t => -t._2)
val rdd4: RDD[(String, Iterable[((String, String), Int)])] = rdd3.groupBy(_._1._1)
val rdd5: RDD[(String, mutable.TreeSet[((String, String), Int)])] = rdd4.mapValues(iters => {
//仅仅对排序方法进行了修改,利用了TreeSet集合的特性,先分组
再排序
val set = new mutable.TreeSet[((String, String), Int)]
for (elem <- iters) {
set += elem
if (set.size > 3) {
set -= set.last
}
}
set
})
rdd5.foreach(println(_))
sc.stop()
}
}
第四种方法:聚合之后 不进行分组 而是进行分区 与第一种方法的不同之处在于 迭代器不转换为集合后再进行排序,而是利用treeSet集合,能够自动排序
object _05_FavTeacherTopn {
def main(args: Array[String]): Unit = {
val sc: SparkContext = SparkUtils.getSparkContext
val rdd1: RDD[String] = sc.textFile("data\anli\fav_teacher\teacher.dat")
//http://spark.doit.cn/laozhao
val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
// ArrayBuffer(http:, , spark.doit.cn, laozhang)
val arr: Array[String] = line.split("/")
val arr2 = arr(2).split("[.]")
((arr2(0),arr(3)),1)
})
//对key先进行聚合
这样可以使数据量大幅度的减少
val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)
//获取课程的集合
val list: List[String] = rdd3.map(_._1._1).distinct().collect().toList
implicit val ord: Ordering[((String, String), Int)] = Ordering[Int].on[((String, String), Int)](t => -t._2)
val rdd4: RDD[((String, String), Int)] = rdd3.partitionBy(new MyPartitioner(list))
val rdd5: RDD[((String, String), Int)] = rdd4.mapPartitions(iters => {
//仅仅对排序方法进行了修改,利用了TreeSet集合的特性,先分组
再排序
val set = new mutable.TreeSet[((String, String), Int)]
for (tp <- iters) {
set += tp
if (set.size > 3) {
set -= set.last
}
}
set.iterator
})
rdd5.foreach(println(_))
sc.stop()
}
}
第五种方法:自定义分区器,保证一个分区内有且仅有一个学科的数据,在调用reduceByKey时就使用自定义的分区器
object _06_FavTeacherTopn {
def main(args: Array[String]): Unit = {
val sc: SparkContext = SparkUtils.getSparkContext
val rdd1: RDD[String] = sc.textFile("data\anli\fav_teacher\teacher.dat")
//http://spark.doit.cn/laozhao
val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
// ArrayBuffer(http:, , spark.doit.cn, laozhang)
val arr: Array[String] = line.split("/")
val arr2 = arr(2).split("[.]")
((arr2(0),arr(3)),1)
})
//获取课程的集合
val list: List[String] = rdd2.map(_._1._1).distinct().collect().toList
//对key先进行聚合
这样可以使数据量大幅度的减少
val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(new MyPartitioner(list),_ + _)
implicit val ord: Ordering[((String, String), Int)] = Ordering[Int].on[((String, String), Int)](t => -t._2)
val rdd4: RDD[((String, String), Int)] = rdd3.mapPartitions(iters => {
//仅仅对排序方法进行了修改,利用了TreeSet集合的特性,先分组
再排序
val set = new mutable.TreeSet[((String, String), Int)]
for (tp <- iters) {
set += tp
if (set.size > 3) {
set -= set.last
}
}
set.iterator
})
rdd4.foreach(println(_))
sc.stop()
}
}
//通过构造器传入课程的集合
class MyPartitioner(val list:List[String]) extends Partitioner{
//通过hashMap
private val hashMap = new mutable.HashMap[String, Int]()
var i:Int = 0
for (elem <- list) {
hashMap ++= Map((elem, i))
i += 1
}
//自定义分区的个数为课程的个数
override def numPartitions: Int = list.size
//自定义分区的规则
override def getPartition(key: Any): Int = {
val sub: String = key.asInstanceOf[(String, String)]._1
val maybeInt: Option[Int] = hashMap.get(sub)
val value: Int = maybeInt.get
value
}
}
//工具类
object SparkUtils {
Logger.getLogger("org").setLevel(Level.ERROR)
def getSparkContext: SparkContext = {
val conf: SparkConf = new SparkConf()
conf.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc
}
}
/**
* 手动生成大量的数据
*/
object _01_GenerateTeacherData {
def main(args: Array[String]): Unit = {
//http://spark.doit.cn/tom
val cou = Array("spark", "flink", "java", "hadoop")
val tea = Array("tom", "rose", "jack", "hank")
val writer: PrintWriter = new PrintWriter(new File("data\anli\fav_teacher\teacher.dat"))
val ran = new Random()
for (i <- 1 to 100000){
val i = ran.nextInt(cou.length)
val cou1: String = cou(i)
val j = ran.nextInt(tea.length)
val tea1: String = tea(j)
writer.println(s"http://$cou1.study.cn/$tea1")
}
writer.close()
}
}
最后
以上就是唠叨画笔为你收集整理的用spark求每一个学科最受欢迎的N个老师(分组Topn,多种方法)的全部内容,希望文章能够帮你解决用spark求每一个学科最受欢迎的N个老师(分组Topn,多种方法)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复