概述
概念解析
参数为函数,且函数的形参是个rdd类型,返回值为Dstream类型
def transform[U : ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
Return a new DStream in which each RDD is generated by applying a
function on each RDD of ‘this’ DStream.
案例
以nc作为测试源,对黑名单进行过滤
nc -lk mypc01 10087
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
/**
* 有若干个男生被列为澡堂黑名单
* 比如zhangsan lisi
*/
object TransformDemo extends App {
private val conf = new SparkConf().setAppName("test").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
private val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
//以RDD的形式存储黑名单
val list = List(("zhangsan", true), ("lisi", true))
private val blackList: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(list)
//接受socket传输的数据转为Dstream
private val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)
//返回一个新的DStream,其中通过对“ this” DStream的每个RDD应用一个函数来生成每个RDD。
//黑名单数据与socket传输的数据依据key进行关联,同时过滤掉key一样的数据
//transform的形参是个rdd,就可以用rdd的一系列算子,rdd的算子比Dstream多
private val dstream1: DStream[(String, (Int, Option[Boolean]))] = dstream.map((_, 1)).transform((rdd: RDD[(String, Int)], time: Time) => {
//socket来的数据与黑名单数据进行join
val rdd1: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(blackList)
//过滤掉合并后没有值的
//filter里函数返回值为false的话,就不会包含了
var rdd2: RDD[(String, (Int, Option[Boolean]))] = rdd1.filter((x: (String, (Int, Option[Boolean]))) => {
//如果什么也没找到就返回默认值,也就是false,否则返回true
if (x._2._2.getOrElse(false)) {
false
} else {
true
}
})
rdd2
})
dstream1.print()
ssc.start()
ssc.awaitTermination()
}
测试时如果输入黑名单中的内容则不会有结果输出,如果非黑名单的则会输出打印
比如wuren不在黑名单里
(wuren,(1,None))
另外,判断那块写成这样也可以,更简单些
if(x._2._2==null) false else true
案例用到的方法解析
final def getOrElse[B >: A](default: => B): B
如果选项非空,则返回选项的值,否则返回评估默认值的结果。
def filter(f: T => Boolean): RDD[T]
Return a new RDD containing only the elements that satisfy a
predicate.
总结
transform算子使得Dstream可以用一些属于RDD的算子,然后返回一个新的Dstream,因为Dstream的算子比Rdd的算子数量上要少一些.
最后
以上就是还单身手套为你收集整理的SparkStreaming transform算子入门案例的全部内容,希望文章能够帮你解决SparkStreaming transform算子入门案例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复