我是靠谱客的博主 魁梧红牛,最近开发中收集的这篇文章主要介绍Spark- 求最受欢迎的TopN课程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

数据库操作工具类

package com.rz.mobile_tag.utils
import java.sql.{Connection, DriverManager, PreparedStatement}
object MySQLUtils {
/**
* 获取数据库连接
* @return
*/
def getConnection()={
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?user=root&password=root")
}
/**
* 释放数据库连接等资源
* @param conn
* @param pstmt
*/
def release(conn:Connection, pstmt:PreparedStatement)={
try {
if (pstmt !=null){
pstmt.close()
}
}catch {
case e:Exception => e.printStackTrace()
}finally {
if (conn != null){
conn.close()
}
}
}
}

数据操作类:优化点(使用批量插入数据库,提交使用batch操作)

package com.rz.mobile_tag.dao
import java.sql.{Connection, PreparedStatement}
import com.rz.mobile_tag.bean.DayVideoAccessStat
import com.rz.mobile_tag.utils.MySQLUtils
import scala.collection.mutable.ListBuffer
object StatDao {
/**
* 批量保存DayVideoAccessStat到数据库
* @param list
*/
def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {
var connection:Connection = null;
var pstmt:PreparedStatement = null;
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) // 设置手动提交

val sql ="insert into day_video_access_topn_stat(day,cms_id,times) value(?,?,?)"
pstmt = connection.prepareStatement(sql)
for (ele <- list){
pstmt.setString(1, ele.day)
pstmt.setLong(2,ele.cmsId)
pstmt.setLong(3, ele.times)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() // 手工提交
}catch {
case e:Exception =>e.printStackTrace()
}finally {
MySQLUtils.release(connection, pstmt)
}
}
}

 

业务实现类

package com.rz.mobile_tag.log
import com.rz.mobile_tag.bean.DayVideoAccessStat
import com.rz.mobile_tag.dao.StatDao
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object TopNStatJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]")
.getOrCreate()
val accessDF: DataFrame = spark.read.format("parquet").load(args(0))
accessDF.printSchema()
accessDF.show(false)
// 最受欢迎的TopN课程

videoAccessTopNStat(spark, accessDF)
spark.stop()
}
/**
* 最受欢迎的TopN课程
* @param spark
* @param accessDF
*/
def videoAccessTopNStat(spark: SparkSession, accessDF: DataFrame) = {
//
import spark.implicits._
//
val videoAccesssTopNDF: Dataset[Row] = accessDF.filter($"day" === "20190506" && $"cmsType" === "video")
//
.groupBy("day", "cmsId")
//
.agg(count("cmsId")).as("times").orderBy($"times".desc)
//
videoAccesssTopNDF.show(false)

accessDF.createOrReplaceTempView("access_logs")
// 使用SQL方式进行统计
val videoAccesssTopNDF: DataFrame = spark.sql("select day, cmsId, count(1) as times from access_logs" +
" where day = '20190506' and cmsType = 'video' group by day, cmsId" +
" order by times desc")
//videoAccesssTopNDF.show(false)
// 将统计数据写入到MySQL中
try{
videoAccesssTopNDF.foreachPartition(partitionOfRecords=>{
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info =>{
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDao.insertDayVideoAccessTopN(list)
})
}catch {
case e:Exception => e.printStackTrace()
}
}
}

 

转载于:https://www.cnblogs.com/RzCong/p/10818916.html

最后

以上就是魁梧红牛为你收集整理的Spark- 求最受欢迎的TopN课程的全部内容,希望文章能够帮你解决Spark- 求最受欢迎的TopN课程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部