概述
项目说明:附件为要计算数据的demo。点击打开链接
利用spark的缓存机制,读取需要筛选的数据,自定义一个分区器,将不同的学科数据分别放到一个分区器中,并且根据指定的学科,取出点击量前三的数据,并写入文件。
具体程序如下:
1、项目主程序:
-
package cn.allengao.Location
-
-
import java.net.URL
-
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
-
/**
-
* class_name:
-
* package:
-
* describe: 缓存机制,自定义一个分区器,根据指定的学科, 取出点击量前三的,按照每种学科数据放到不同的分区器里
-
* creat_user: Allen Gao
-
* creat_date: 2018/1/30
-
* creat_time: 11:21
-
**/
-
-
object AdvUrlCount {
-
-
def main(args: Array[String]) {
-
-
//从数据库中加载规则
-
// val arr = Array("java.learn.com", "php.learn.com", "net.learn.com")
-
-
val conf = new SparkConf().setAppName(
"AdvUrlCount").setMaster(
"local[2]")
-
val sc = new SparkContext(conf)
-
-
//获取数据
-
val file = sc.textFile(
"j://information/learn.log")
-
//提取出url并生成一个元祖,rdd1将数据切分,元组中放的是(URL, 1)
-
val urlAndOne = file.map(line => {
-
val fields = line.split(
"t")
-
val url = fields(
1)
-
(url,
1)
-
})
-
//把相同的url进行聚合
-
val sumedUrl = urlAndOne.reduceByKey(_ + _)
-
-
//获取学科信息缓存,提高运行效率
-
val cachedProject = sumedUrl.map(x => {
-
val url = x._1
-
val project = new URL(url).getHost
-
val count = x._2
-
(project, (url, count))
-
}).cache()
-
-
//调用Spark自带的分区器此时会发生哈希碰撞,会有数据倾斜问题产生,需要自定义分区器
-
// val res = cachedProject.partitionBy(new HashPartitioner(3))
-
// res.saveAsTextFile("j://information//out")
-
-
//得到所有学科
-
val projects = cachedProject.keys.distinct().collect()
-
//调用自定义分区器并得到分区号
-
val partitioner = new ProjectPartitioner(projects)
-
-
//分区
-
val partitioned: RDD[(String, (String,
Int))] = cachedProject.partitionBy(partitioner)
-
-
//对每个分区的数据进行排序并取top3
-
val res = partitioned.mapPartitions(it => {
-
it.toList.sortBy(_._2._2).reverse.take(
3).iterator
-
})
-
res.saveAsTextFile(
"j://information//out1")
-
-
sc.stop()
-
}
-
}
2、自定义分区器:
-
package cn.allengao.
Location
-
-
import org.apache.spark.Partitioner
-
-
import scala.collection.mutable
-
-
class ProjectPartitioner(projects: Array[String]) extends Partitioner {
-
//用来存放学科和分区号
-
private val projectsAndPartNum = new mutable.
HashMap[
String,
Int]()
-
//计数器,用于指定分区号
-
var n =
0
-
-
for(pro<-projects){
-
projectsAndPartNum += (pro -> n)
-
n +=
1
-
}
-
//得到分区数
-
override def numPartitions = projects.length
-
//得到分区号
-
override def getPartition(key:
Any) = {
-
projectsAndPartNum.getOrElse(key.
toString,
0)
-
}
-
}
运行结果:
最后
以上就是贪玩大侠为你收集整理的spark计算用户访问学科子网页的top3的全部内容,希望文章能够帮你解决spark计算用户访问学科子网页的top3所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复