我是靠谱客的博主 唠叨灰狼,最近开发中收集的这篇文章主要介绍Spark RDD 行动算子,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.reduce

函数签名 def reduce(f: (T, T) => T): T

代码:

/**
* reduce()聚合
*/
object ActionDemo {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15))
println(rdd.reduce(_+_))
}
}

 2.collect

函数签名 def collect(): Array[T]

代码: 

/**
*count()返回RDD中元素个数
*/
object ActionDemo2{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15))
rdd.collect().foreach(println)
}
}

 3.count

函数签名 def count(): Long

代码: 

/**
*count()返回RDD中元素个数
*/
object ActionDemo2{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15))
val rdd2 =rdd.count()
println(rdd2)
}
}

 4.first

函数签名 def first(): T

返回 RDD 中的第一个元素

代码: 

/**
*first()返回RDD中的第一个元素
*/
object ActionDemo3{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15))
val rdd2 =rdd.first()
println(rdd2)
}
}

5.take

函数签名 def take(num: Int): Array[T]

代码:

/**
*take()返回由RDD前n个元素组成的数组
*/
object ActionDemo4{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15))
val rdd2 =rdd.take(2)
rdd2.foreach(println)
}
}

 6.takeOrdered

函数签名 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

代码:

/**
*takeOrdered()返回该RDD排序后前n个元素组成的数组
*/
object ActionDemo5{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
val rdd2 =rdd.takeOrdered(2)
rdd2.foreach(println)
}
}

 7.aggregate

函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

代码:

/**
* aggregate()案例
*/
object ActionDemo6{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
//22 23 25 9 19 110
//空分区也有默认值
val rdd2 =rdd.aggregate(10)(_+_,_+_)
println(rdd2)//318
}
}

 8.fold

函数签名 def fold(zeroValue: T)(op: (T, T) => T): T

代码:

/**
* fold()案例
*/
object ActionDemo7{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array(12,13,15,-1,9,100))
//22 23 25 9 19 110
//空分区也有默认值
val rdd2 =rdd.fold(10)(_+_)
println(rdd2)//318
}
}

 9.countByKey

函数签名 def countByKey(): Map[K, Long]

代码: 

/**
* countByKey()统计每种key的个数
*/
object ActionDemo8 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(Array((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
val rdd2 = rdd.countByKey()
println(rdd2)
}
}

 10.save 相关算子

函数签名

def saveAsTextFile(path: String): Unit

def saveAsObjectFile(path: String): Unit

def saveAsSequenceFile( path: String,

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

将数据保存到不同格式的文件中

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")

 11.foreach

函数签名

def foreach(f: T => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }

分布式遍历 RDD 中的每一个元素,调用指定函数

最后

以上就是唠叨灰狼为你收集整理的Spark RDD 行动算子的全部内容,希望文章能够帮你解决Spark RDD 行动算子所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部