我是靠谱客的博主 还单身手套,最近开发中收集的这篇文章主要介绍SparkStreaming transform算子入门案例,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

概念解析

参数为函数,且函数的形参是个rdd类型,返回值为Dstream类型

def transform[U : ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

Return a new DStream in which each RDD is generated by applying a
function on each RDD of ‘this’ DStream.

案例

以nc作为测试源,对黑名单进行过滤

nc -lk mypc01 10087
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

/**
 * 有若干个男生被列为澡堂黑名单
 * 比如zhangsan lisi
 */
object TransformDemo extends App {
  private val conf = new SparkConf().setAppName("test").setMaster("local[*]")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  private val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))

  //以RDD的形式存储黑名单
  val list = List(("zhangsan", true), ("lisi", true))
  private val blackList: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(list)
  //接受socket传输的数据转为Dstream
  private val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)
  //返回一个新的DStream,其中通过对“ this” DStream的每个RDD应用一个函数来生成每个RDD。
  //黑名单数据与socket传输的数据依据key进行关联,同时过滤掉key一样的数据
  //transform的形参是个rdd,就可以用rdd的一系列算子,rdd的算子比Dstream多
  private val dstream1: DStream[(String, (Int, Option[Boolean]))] = dstream.map((_, 1)).transform((rdd: RDD[(String, Int)], time: Time) => {
    //socket来的数据与黑名单数据进行join
    val rdd1: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(blackList)
    //过滤掉合并后没有值的
    //filter里函数返回值为false的话,就不会包含了
    var rdd2: RDD[(String, (Int, Option[Boolean]))] = rdd1.filter((x: (String, (Int, Option[Boolean]))) => {
    //如果什么也没找到就返回默认值,也就是false,否则返回true
      if (x._2._2.getOrElse(false)) {
        false
      } else {
        true
      }
    })
    rdd2
  })
  dstream1.print()
  ssc.start()
  ssc.awaitTermination()
}

测试时如果输入黑名单中的内容则不会有结果输出,如果非黑名单的则会输出打印
比如wuren不在黑名单里

(wuren,(1,None))

另外,判断那块写成这样也可以,更简单些

 if(x._2._2==null) false else true

案例用到的方法解析

final def getOrElse[B >: A](default: => B): B

如果选项非空,则返回选项的值,否则返回评估默认值的结果。

def filter(f: T => Boolean): RDD[T]

Return a new RDD containing only the elements that satisfy a
predicate.

总结

transform算子使得Dstream可以用一些属于RDD的算子,然后返回一个新的Dstream,因为Dstream的算子比Rdd的算子数量上要少一些.

最后

以上就是还单身手套为你收集整理的SparkStreaming transform算子入门案例的全部内容,希望文章能够帮你解决SparkStreaming transform算子入门案例所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部