概述
文章目录
- 前言
- 代码
- 总结
- 黑名单
前言
本文是SparkStreaming的转换操作之transform的特殊之处
以下是本篇文章代码部分,所用语言是scala
代码
在代码中会首先展示普通的转换操作,然后再展示transform操作。
val socketLineDStream: ReceiverInputDStream[String] = streamingContext.socketTextStream('linux1', 8888)
// TODO Driver中执行一次
// 例如val a = 1 在Driver中只执行一次
// 首先看一下DStream的其他Transformations(转换)操作
socketLineDStream.map({
case x => {
// TODO Executor中执行n次(n是Executor数)
}
}
)
// 重点来了,看一下DStream的transform转换操作
socketLineDStream.transform({
case rdd => {
// TODO Driver中执行m次(m是采集周期数)
rdd.map({
case x => {
//TODO Executor中执行n次(n是Executor数)
}
})
}
}
)
总结
下面对文章进行总结:
- transform中的注释处的m就是细节之处,它可以保证此处运行在Driver中的代码可以周期(SparkStreaming的数据采集周期)间变化,即每个数据周期transform走一遍。用处之一是黑名单的更新(比如恶意发帖的用户的更新)
- 为什么DStream.map里面的代码执行是在Executor?个人理解因为DStream在每个周期(批次)相当于就是一个RDD的封装,所以可以类比RDD.map()里面的代码是运行在Executor端
黑名单
补充黑名单的粗略代码:
object RealTimeApp {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("RealTimeApp").setMaster("local[*]")
//2.创建StreamContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.读取Kafka数据
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream("ads_log", ssc)
//4.将从Kafka读取的数据转换成样例类对象
val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(
record => {
val value: String = record.value()
val arr: Array[String] = value.split(" ")
Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
}
)
//5.需求一:根据MySQL中的黑名单过滤当前数据集
val filterAdsLogDStream: DStream[Ads_log] = BlackListHandler2.filterByBlackList(adsLogDStream)
//6.需求二:将满足要求的用户写入黑名单
BlackListHandler2.addBlackList(filterAdsLogDStream)
//测试打印
filterAdsLogDStream.cache()
filterAdsLogDStream.count().print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
case class Ads_log(timestamp: Long, area: String, city: String, userid: String, adid: String)
}
// 放到里面和外面都行
// case class Ads_log(timestamp: Long, area: String, city: String, userid: String, adid: String)
补充黑名单的详细代码:
object SparkStreaming11_Req1_BlackList1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kafkaPara)
)
val adClickData = kafkaDataDS.map(
kafkaData => {
val data = kafkaData.value()
val datas = data.split(" ")
AdClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
}
)
val ds = adClickData.transform(
rdd => {
// TODO 通过JDBC周期性获取黑名单数据
val blackList = ListBuffer[String]()
val conn = JDBCUtil.getConnection
val pstat = conn.prepareStatement("select userid from black_list")
val rs: ResultSet = pstat.executeQuery()
while ( rs.next() ) {
blackList.append(rs.getString(1))
}
rs.close()
pstat.close()
conn.close()
// TODO 判断点击用户是否在黑名单中
val filterRDD = rdd.filter(
data => {
!blackList.contains(data.user)
}
)
// TODO 如果用户不在黑名单中,那么进行统计数量(每个采集周期)
filterRDD.map(
data => {
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val day = sdf.format(new java.util.Date( data.ts.toLong ))
val user = data.user
val ad = data.ad
(( day, user, ad ), 1) // (word, count)
}
).reduceByKey(_+_)
}
)
ds.foreachRDD(
rdd => {
// rdd. foreach方法会每一条数据创建连接
// foreach方法是RDD的算子,算子之外的代码是在Driver端执行,算子内的代码是在Executor端执行
// 这样就会涉及闭包操作,Driver端的数据就需要传递到Executor端,需要将数据进行序列化
// 数据库的连接对象是不能序列化的。
// RDD提供了一个算子可以有效提升效率 : foreachPartition
// 可以一个分区创建一个连接对象,这样可以大幅度减少连接对象的数量,提升效率
rdd.foreachPartition(
iter => {
val conn = JDBCUtil.getConnection
iter.foreach{
case ( ( day, user, ad ), count ) => {
}
}
conn.close()
}
)
rdd.foreach{
case ( ( day, user, ad ), count ) => {
println(s"${day} ${user} ${ad} ${count}")
if ( count >= 30 ) {
// TODO 如果统计数量超过点击阈值(30),那么将用户拉入到黑名单
val conn = JDBCUtil.getConnection
val sql = """
|insert into black_list (userid) values (?)
|on DUPLICATE KEY
|UPDATE userid = ?
""".stripMargin
JDBCUtil.executeUpdate(conn, sql, Array( user, user ))
conn.close()
} else {
// TODO 如果没有超过阈值,那么需要将当天的广告点击数量进行更新。
val conn = JDBCUtil.getConnection
val sql = """
|select
| *
| from user_ad_count
|where dt = ? and userid = ? and adid = ?
""".stripMargin
val flg = JDBCUtil.isExist(conn, sql, Array( day, user, ad ))
// 查询统计表数据
if (flg) {
// 如果存在数据,那么更新
val sql1 = """
|update user_ad_count
| set count = count + ?
| where dt = ? and userid = ? and adid = ?
""".stripMargin
JDBCUtil.executeUpdate(conn, sql1, Array(count, day, user, ad))
// TODO 判断更新后的点击数据是否超过阈值,如果超过,那么将用户拉入到黑名单。
val sql2 = """
|select
| *
|from user_ad_count
|where dt = ? and userid = ? and adid = ? and count >= 30
""".stripMargin
val flg1 = JDBCUtil.isExist(conn, sql2, Array( day, user, ad ))
if ( flg1 ) {
val sql3 = """
|insert into black_list (userid) values (?)
|on DUPLICATE KEY
|UPDATE userid = ?
""".stripMargin
JDBCUtil.executeUpdate(conn, sql3, Array( user, user ))
}
} else {
val sql4 = """
| insert into user_ad_count ( dt, userid, adid, count ) values ( ?, ?, ?, ? )
""".stripMargin
JDBCUtil.executeUpdate(conn, sql4, Array( day, user, ad, count ))
}
conn.close()
}
}
}
}
)
ssc.start()
ssc.awaitTermination()
}
}
// 广告点击数据
case class AdClickData( ts:String, area:String, city:String, user:String, ad:String )
最后
以上就是明亮毛豆为你收集整理的sparkStreaming之transform的细节前言代码总结黑名单的全部内容,希望文章能够帮你解决sparkStreaming之transform的细节前言代码总结黑名单所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复