我是靠谱客的博主 大胆唇彩,最近开发中收集的这篇文章主要介绍一起学习Spark——Transformation算子,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

操作系统:CentOS-7.8
Spark版本:2.4.4
scala版本:2.11.12

本篇文章锤子和大家一起学习Spark RDD的常用Transformation算子,在文章中把转换算子分为了六大类:转换操作、过滤操作、集合操作、排序操作、聚合操作、分区操作,锤子会对每个算子含义和入参进行说明,并附上演示代码,帮助大家快速理解和使用这些常用算子(由于Spark的RDD算子还是比较多的,本篇文章主要列出的是一些常用的,后续如果学习更多了再继续补充),完整示例代码的GitHub地址:https://github.com/telundusiji/dream-hammer/tree/master/module-8

转换操作

map

说明

把RDD中的数据一对一的转换成另一种形式

方法


def map[U: ClassTag](f: T => U): RDD[U]

参数

  • f -> 原RDD转向新RDD的过程,传入函数的参数是原RDD数据,函数返回值是经过转换后的新RDD数据

示例


//示例中是将String 进行分割成 String[]  
//所以转换后的RDD的每一条数据就是一个数组,在最后我们打印了数组的长度
def mapTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      "Hadoop HBase FLink",
      "Hello world",
      "你好 Spark"
    ))
    sourceRdd
      .map(item => item.split(" "))
      .collect()
      .foreach(item => println(item.length))

    sc.stop()
}

//运行结果为
/*
3
2
2
*/

flatMap

说明

flatMap和map类似,但是flatMap是把RDD中的数据一对多的转换成另一种形式

方法


def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

参数

  • f -> 原RDD数据,返回值经过转换后的新RDD数据,传入函数的入参是原RDD数据,传入函数的出参是一个集合,集合被展平后的数据才是新的RDD数据

示例


//和map示例的初始数据相同,使用flatMap后每一条数据被分割后的字符串数字,会被展平,产生多条数据
def flatMapTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      "Hadoop HBase FLink",
      "Hello world",
      "你好 Spark"
    ))
    sourceRdd
      .flatMap(item => item.split(" "))
      .collect()
      .foreach(item => println(item))

    sc.stop()
}
  
//运行结果
/*
Hadoop
HBase
FLink
Hello
world
你好
Spark
*/

mapPartitions

说明

针对整个分区的数据进行转换。

与map区别

  • map针对每一条数据进行处理,粒度是一条数据 item=>item;mapPartitions每次一个分区的数据,粒度是分区,iter=>iter

方法


  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]

参数

  • f -> 原RDD数据,返回值经过转换后的新RDD数据,传入函数的入参是原RDD的分区的迭代器(集合),传入函数的出参是转换后的数据的迭代器(集合)

  • preservesPartitioning:是否保留父RDD的分区信息,默认false

示例


/**
* mapPartitions对一整个分区数据进行转换
* 与map区别
* 1.map的func参数是单条数据,mapPartitions的func的参数是一个分区的数据,及一个集合
* 2.map的func返回值是单条数据,mapPartitions的func的返回值是一个集合
*/
//每次处理一个分区数据,每个分区是一个迭代器,使用迭代器就可以对每条数据转换,最后再返回迭代器
def mapPartitionsTest(): Unit = {
	val sourceRdd = sc.parallelize(Seq(1, 2, 3, 4), 3)
	sourceRdd
	    .mapPartitions(partition => {
	    partition.map(item => item * 10)
	    })
	    .collect()
	    .foreach(item => println(item))
	
	sc.stop()
}
//运行结果
/*
10
20
30
40
*/

mapPartitionsWithIndex

说明

针对整个分区的数据进行转换,并且再传入分区数据时会附带分区的索引编号

方法


def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]

参数

  • f -> 原RDD数据,返回值经过转换后的新RDD数据,传入函数的入参是原RDD的分区的索引以及分区数据迭代器(集合),传入函数的出参是转换后的数据的迭代器(集合)

  • preservesPartitioning:是否保留父RDD的分区信息,默认false

示例


/**
* mapPartitionsWithIndex和mapPartitions的区别就是再mapPartitionsWithIndex中func的入参多个分区编号
*/
//与mapPartitions演示案例不同的就是,再转换每条数据时,将其与分区索引编号相乘
def mapPartitionsWithIndexTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(1, 2, 3, 4), 3)
    sourceRdd
      .mapPartitionsWithIndex((index, partition) => {
        partition.map(item => index * item)
      })
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
0
2
6
8
*/

mapValues

作用

只能使用在RDD内数据为Key-Value类型的RDD,使用函数对数据进行转换,与map不同的是,mapValues只转换Key-Value类型数据的value

方法


def mapValues[U](f: V => U): RDD[(K, U)]

参数

  • f -> 原RDD转向新RDD的过程,传入函数的参数是原RDD每条k-v数据的value,函数返回值是经过转换后的新RDD数据的每条k-v数据的value

演示


/**
* mapValues与map相似,不同的是map作用于整条数据,mapValues作用于每条Key-Value类型数据的value
*/
//对源rdd的key-value类型数据的value进行转换
def mapValuesTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 3)
    sourceRdd
      .mapValues(itemValue => itemValue * 10)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(a,10)
(b,20)
(c,30)
(d,40)
*/

join

说明

将两个RDD按照相同的Key进行连接

方法


def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

参数

  • other:进行join操作的另一个RDD

示例


//将rdd1与rdd2按照key进行join
def joinTest(): Unit = {
    val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3), ("b", 4)))
    val rdd2 = sc.parallelize(Seq(("a", 5), ("a", 6)))
    rdd1
      .join(rdd2)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(a,(1,5))
(a,(1,6))
(a,(2,5))
(a,(2,6))
*/

过滤操作

filter

说明

filter算子是过滤掉RDD中不需要的内容

方法


def filter(f: T => Boolean): RDD[T]

参数

  • f -> 过滤RDD中的数据,传入函数的入参是原RDD中的一条数据,传入函数返回值是一个Boolean类型的值,ture代表该条数据保留,false代表该条数据去除

示例


/**
* filter过滤掉数据集中不符合要求的元素
* filter的func的入参是RDD的一个元素,返回值是boolean类型值,true代表保留该数据,false代表去除该数据
*/
//过滤数据,把值大于2的数据保留
def filterTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 3)
    sourceRdd
      .filter(item => item > 2)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
3
4
5
*/

sample

说明

sample是从一个数据集中抽样出一部分,常被用来在尽可能减少数据规律损失的情况下,减少数据集以保证运行速度

方法


 def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]

参数

  • withReplacement:抽样出来的数据是否有放回,ture代表有放回,false代表不放回

  • fraction:抽样比列

  • seed:随机数种子

示例


//对源数据进行两种情况抽样演示,
//第一种无放回抽样,则抽样结果不会出现重复
//第二种有放回抽象,则抽样结果可能出现重复
def sampleTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 3)
    //无放回
    System.out.println("抽样无放回演示")
    sourceRdd
      .sample(false, 0.8, 100)
      .collect()
      .foreach(item => println(item))
    //有放回
    System.out.println("抽样有放回演示")
    sourceRdd
      .sample(true, 0.8, 100)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
抽样无放回演示
1
3
4
5
抽样有放回演示
1
1
2
5
*/

集合操作

intersection

说明

对两个rdd中元素进行交集操作

方法


//交集总共有三个重载方法,如下

def intersection(other: RDD[T]): RDD[T]

def intersection(
    other: RDD[T],
    partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    
def intersection(other: RDD[T], numPartitions: Int): RDD[T]

参数

  • other:进行交集的另一个RDD

  • partitioner:交集操作后生成RDD的分区器(分区方式)

  • numPartitions:交集操作后生成RDD的分区数

演示


def intersectionTest(): Unit = {
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
    val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
    rdd1
      .intersection(rdd2)
      .collect()
      .foreach(item => println(item))

    val rdd3 = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c")))
    val rdd4 = sc.parallelize(Seq((2, "b"), (3, "d"), (4, "d")))

    rdd3
      .intersection(rdd4)
      .collect()
      .foreach(println(_))

    sc.stop()
}

//运行结果
/*
4
5
(2,b)
*/

union

说明

对两个RDD中的元素进行并集操作

方法


def union(other: RDD[T]): RDD[T]

参数

  • other:进行交集的另一个RDD

演示


def unionTest(): Unit = {
    val rdd1 = sc.parallelize(Seq(1, 2, 3))
    val rdd2 = sc.parallelize(Seq(2, 3, 4))
    rdd1
      .union(rdd2)
      .collect()
      .foreach(item => println(item))
    
    sc.stop()
}

//运行结果
/*
1
2
3
2
3
4
*/

subtract

说明

调用该操作的RDD与入参中传入RDD的差集

方法


//差集的三个重载方法

def subtract(other: RDD[T]): RDD[T]

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

def subtract(
    other: RDD[T],
    p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

参数

  • other:对其进行差集操作的另一个RDD

  • numPartitions:差集操作后生成新的RDD的分区数

  • partitioner:差集操作后生成新的RDD的分区方式

演示


//因为差集是xx对xx的差,所以是分方向,即A对B的差集和B对A的差集结果是不一样的
def subtractTest(): Unit = {
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
    val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
    System.out.println("rdd1.subtract(rdd2)")
    rdd1
      .subtract(rdd2)
      .collect()
      .foreach(item => println(item))

    System.out.println("rdd1.subtract(rdd2)")
    rdd2
      .subtract(rdd1)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
rdd1.subtract(rdd2)
1
2
3
rdd2.subtract(rdd1)
6
7
8
*/

排序操作

sortByKey

说明

对RDD内元素为Key-Value类型数据时,按照key排序

方法


def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

参数

  • ascending:是否升序,默认是true代表升序,设置为false时代表降序

  • numPartitions:排序后RDD的分区数

演示


def sortByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq((2, "a"), (1, "b"), (3, "c")))

    sourceRdd
      .sortByKey()
      .collect()
      .foreach(println(_))

    sc.stop()
}

//运行结果
/*
(1,b)
(2,a)
(3,c)
*/

sortBy

说明

根据指定的元素的属性进行排序

方法


def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

参数

  • f -> 该函数返回要排序的字段

  • ascending:是否升序,默认是true代表升序,设置为false时代表降序

  • numPartitions:排序后RDD的分区数

演示


def sortByTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq((2, "a"), (1, "b"), (3, "c")))

    sourceRdd
      .sortBy(item => item._2,false)
      .collect()
      .foreach(println(_))

    sc.stop()
}

//运行结果
/*
(3,c)
(1,b)
(2,a)
*/

聚合操作

reduceByKey

说明

先按照Key进行分组生成一个Tuple,然后针对每个分组再进执行reduce算子

方法


def reduceByKey(func: (V, V) => V): RDD[(K, V)]

参数

  • func -> 执行数据处理的函数,传入的函数有两个入参,第一个入参是局部聚合值,第二个入参是当前值,函数有一个输出为汇总结果

示例


//一个年级和名称的集合,按照key分组后,进行汇总,
//即“张三、王五、赵六”属于同一个key,“李四”属于另一个Key,相同Key所对应的数据会汇总在一起
def reduceByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      ("一年级","张三"),
      ("二年级","李四"),
      ("一年级","王五"),
      ("一年级","赵六")
    ))
    sourceRdd
      .reduceByKey((agg, curr) => curr + "-" + agg)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(二年级,李四)
(一年级,张三-王五-赵六)
*/


groupByKey

说明

对于Key-Value数据类型RDD的元素,按照Key进行分组,不对分组后的结果聚合

方法


//groupByKey的三个重载方法

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

参数

  • numPartitions:操作结果RDD的分区数

  • partitioner:操作结果RDD的分区器

注意

  • groupByKey是一个shuffled操作

  • groupByKey需要列举key对应的所有数据,所以无法在Map端进行Combine操作,所以groupByKey性能没有reduceByKey性能好

示例


def groupByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      ("一年级", "张三"),
      ("二年级", "李四"),
      ("一年级", "王五"),
      ("一年级", "赵六")
    ))
    sourceRdd
      .groupByKey()
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(二年级,CompactBuffer(李四))
(一年级,CompactBuffer(张三, 王五, 赵六))
*/

combineByKey

说明

对数据集按照key来聚合,用户可以自定义聚合的初步转换,以及在每个分区进行初步聚合和在所有分区上最终聚合结果

方法


def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)]

参数

  • createCombiner:对value的值进行初步转换,函数入参为Key-value数据的value

  • mergeValue:在每个分区上把上一步转换的结果进行聚合,函数入参为createCombiner的出参以及下一条数据的value

  • mergeCombiners:在所有分区上把每个分区的结果再进行聚合

  • partitioner:分区器

  • mapSideCombine:是否在map端combine

  • serializer:序列化器

其他

  • groupByKey和reduceByKey的底层都是combineByKey

示例


def combineByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      ("一年级", "张三"),
      ("二年级", "李四"),
      ("一年级", "王五"),
      ("一年级", "赵六")
    ))
    sourceRdd
      .combineByKey(
        createCombiner = curr => (curr, 1),
        mergeValue = (agg: (String, Int), nextValue) => (agg._1 + nextValue, agg._2 + 1),
        mergeCombiners = (agg: (String, Int), curr: (String, Int)) => (curr._1 + "|" + agg._1, curr._2 + agg._2))
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(二年级,(李四,1))
(一年级,(赵六|王五|张三,3))
*/

foldByKey

说明

和reduceByKey类似,都是按照Key进行分组聚合,不同点是foldByKey在聚合前会对每条数据加上初始值

方法


//三个重载方法

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(
    zeroValue: V,
    partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

参数

  • zeroValue:初始值

  • func -> 和reduceByKey一致,执行数据处理的函数,传入的函数有两个入参,第一个入参是局部聚合值,第二个入参是当前值,函数有一个输出为汇总结果

  • numPartitions:操作结果RDD的分区数

  • partitioner:操作结果RDD的分区器

示例


//在进行聚合操作前会对每一条数据都加上一个初识值
def foldByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      ("一年级", "张三"),
      ("二年级", "李四"),
      ("一年级", "王五"),
      ("一年级", "赵六")
    ))
    sourceRdd
      .foldByKey("|")((agg, curr) => curr + agg)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(二年级,李四|)
(一年级,赵六|王五|张三|)
*/

aggregateByKey

说明

聚合所有相同的Key的value,并且聚合前可以使用初识值对每条数据进行操作

方法


//同样有三个重载,另外两个跟其他算子类似多了一个分区方法入参,所以这里仅列出主要的一个

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]

参数

  • zeroValue:指定初始值

  • seqOp -> 转换每一个值的函数

  • combOp -> 将转换的值聚合的函数

示例


//先对每一条值的左右两边加上“|”,然后再聚合
def aggregateByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(
      ("一年级", "张三"),
      ("二年级", "李四"),
      ("一年级", "王五"),
      ("一年级", "赵六")
    ))
    sourceRdd
      .aggregateByKey("|")((zeroValue, item) => zeroValue + item + zeroValue, (agg, curr) => curr + agg)
      .collect()
      .foreach(item => println(item))

    sc.stop()
}

//运行结果
/*
(二年级,|李四|)
(一年级,|赵六||王五||张三|)
*/

分区操作

repartitions

说明

对RDD重分区,可以把分区数调大,也可以把分区数调小

方法


def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

参数

  • numPartitions:指定分区数

示例


//调整分区并指定分区数
def repartitionByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7), 3)
    System.out.println("调大分区数:" + sourceRdd.repartition(5).getNumPartitions)
    System.out.println("调小分区数" + sourceRdd.repartition(2).getNumPartitions)
    sc.stop()
}

//运行结果
/*
调大分区数:5
调小分区数2
*/

coalesce

说明

对分区数进行调整,默认只能调小,不能调大,当在参数中指定允许shuffle时,才可以调大

方法


def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T]

参数

  • numPartitions:重分区的分区数

  • shuffle:是否允许shuffle,默认false,即在重分区不允许shuffle,则不能扩大分区数

  • partitionCoalescer:分区合并器,可选项,默认不指定

示例


//在不允许shuffle的情况下,分区数无法扩大
def coalesceByKeyTest(): Unit = {
    val sourceRdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7), 3)
    System.out.println("调大分区数:" + sourceRdd.coalesce(5).getNumPartitions)
    System.out.println("调大分区数:" + sourceRdd.coalesce(5, true).getNumPartitions)
    System.out.println("调小分区数" + sourceRdd.coalesce(2).getNumPartitions)
    sc.stop()
}

//运行结果
/*
调大分区数:3
调大分区数:5
调小分区数2
*/

###总结

本篇文章主要是对RDD的常见转换算子进行了说明并附带了示例,帮助大家学习, 由于RDD的算子比较多,所以本篇文章没有全面覆盖,后续学习过程中锤子再进行补充

个人公众号【爱做梦的锤子】,全网同id,个站 http://te-amo.site,欢迎关注,里面会分享更多有用知识

觉得不错就点个赞叭QAQ

最后

以上就是大胆唇彩为你收集整理的一起学习Spark——Transformation算子的全部内容,希望文章能够帮你解决一起学习Spark——Transformation算子所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部