我是靠谱客的博主 贪玩大侠,最近开发中收集的这篇文章主要介绍spark计算用户访问学科子网页的top3,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

项目说明:附件为要计算数据的demo。点击打开链接

利用spark的缓存机制,读取需要筛选的数据,自定义一个分区器,将不同的学科数据分别放到一个分区器中,并且根据指定的学科,取出点击量前三的数据,并写入文件。

具体程序如下:

1、项目主程序:


  1. package cn.allengao.Location
  2. import java.net.URL
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
  5. /**
  6. * class_name:
  7. * package:
  8. * describe: 缓存机制,自定义一个分区器,根据指定的学科, 取出点击量前三的,按照每种学科数据放到不同的分区器里
  9. * creat_user: Allen Gao
  10. * creat_date: 2018/1/30
  11. * creat_time: 11:21
  12. **/
  13. object AdvUrlCount {
  14. def main(args: Array[String]) {
  15. //从数据库中加载规则
  16. // val arr = Array("java.learn.com", "php.learn.com", "net.learn.com")
  17. val conf = new SparkConf().setAppName( "AdvUrlCount").setMaster( "local[2]")
  18. val sc = new SparkContext(conf)
  19. //获取数据
  20. val file = sc.textFile( "j://information/learn.log")
  21. //提取出url并生成一个元祖,rdd1将数据切分,元组中放的是(URL, 1)
  22. val urlAndOne = file.map(line => {
  23. val fields = line.split( "t")
  24. val url = fields( 1)
  25. (url, 1)
  26. })
  27. //把相同的url进行聚合
  28. val sumedUrl = urlAndOne.reduceByKey(_ + _)
  29. //获取学科信息缓存,提高运行效率
  30. val cachedProject = sumedUrl.map(x => {
  31. val url = x._1
  32. val project = new URL(url).getHost
  33. val count = x._2
  34. (project, (url, count))
  35. }).cache()
  36. //调用Spark自带的分区器此时会发生哈希碰撞,会有数据倾斜问题产生,需要自定义分区器
  37. // val res = cachedProject.partitionBy(new HashPartitioner(3))
  38. // res.saveAsTextFile("j://information//out")
  39. //得到所有学科
  40. val projects = cachedProject.keys.distinct().collect()
  41. //调用自定义分区器并得到分区号
  42. val partitioner = new ProjectPartitioner(projects)
  43. //分区
  44. val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
  45. //对每个分区的数据进行排序并取top3
  46. val res = partitioned.mapPartitions(it => {
  47. it.toList.sortBy(_._2._2).reverse.take( 3).iterator
  48. })
  49. res.saveAsTextFile( "j://information//out1")
  50. sc.stop()
  51. }
  52. }
2、自定义分区器:


  1. package cn.allengao. Location
  2. import org.apache.spark.Partitioner
  3. import scala.collection.mutable
  4. class ProjectPartitioner(projects: Array[String]) extends Partitioner {
  5. //用来存放学科和分区号
  6. private val projectsAndPartNum = new mutable. HashMap[ String, Int]()
  7. //计数器,用于指定分区号
  8. var n = 0
  9. for(pro<-projects){
  10. projectsAndPartNum += (pro -> n)
  11. n += 1
  12. }
  13. //得到分区数
  14. override def numPartitions = projects.length
  15. //得到分区号
  16. override def getPartition(key: Any) = {
  17. projectsAndPartNum.getOrElse(key. toString, 0)
  18. }
  19. }

运行结果:







最后

以上就是贪玩大侠为你收集整理的spark计算用户访问学科子网页的top3的全部内容,希望文章能够帮你解决spark计算用户访问学科子网页的top3所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部