概述
map和mapParttion都是spark的算子,他们在进行数据处理时有一定的区别:
- map是RDD中的每一个元素进行操作。
- mapPartition是对RDD的每一个分区的迭代器进行操作,返回的是迭代器。
mapPartiton的优势:
- 提高性能,比如我们对一个含有100条log数据的分区进行操作,使用map的话函数要执行100次计算。使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。如果map执行的过程中还需要创建对象,比如创建redis连接,jdbc连接等。map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接。
mapPartiton的缺点:
- 对于一个partition有很多数据的话,一次函数处理可能会导致OOM。普通的map一般不会导致OOM。
比如以下代码,分析某个报表的时候,我们用mapPartition进行处理,这样一个partition我们创建了一次redis连接,和一个ListBuffer,然后遍历这个分区,将数据存储到ListBuffer中。需要注意的是mapPartition返回的是iterator。
package DMP0505.Repoort
import DMP0505.Bean.Log
import DMP0505.Util.{JedisPools, RptUtils}
import org.apache.commons.lang.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis
object AppAnalyseRptV2 {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println(
"""
|cn.dmp.report.AppAnalyseRptV2
|参数:
| 输入路径
| 输出路径
""".stripMargin)
sys.exit()
}
val Array(inputPath, outputPath) = args
// 2 创建sparkconf->sparkContext
val sparkConf = new SparkConf()
sparkConf.setAppName(s"${this.getClass.getSimpleName}")
sparkConf.setMaster("local[*]")
// RDD 序列化到磁盘 worker与worker之间的数据传输
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
sc.textFile("inputPath")
.map(_.split(",", -1))
.filter(_.length >= 85)
.map(Log(_)).filter(log => !log.appname.isEmpty || !log.appid.isEmpty)
//为什么要对分区进行操作? map和mapPartition区别,普通的partition如果有1000条数据使用map要调用1000次
//如果使用mapPartition,调用一次function就可以处理1000条数据,但数据量大了可能OOM
//而且本例中有创建jedis ,使用map的话每次都要创建jedis连接,用partition的话只需要创建一次redis就可以处理一个partition
.mapPartitions(itr => {
val jedis: Jedis = JedisPools.getResource()
val parResult = new collection.mutable.ListBuffer[(String, List[Double])]()
//遍历分区的所有数据,查询redis(把appname为空的数据进行转换),将结果放入到Listbuffer
itr.foreach(log => {
var newAppName = log.appname
if (StringUtils.isEmpty(newAppName)) {
newAppName = jedis.get(log.appname)
}
val req = RptUtils.caculateReq(log.requestmode, log.processnode)
val rtb = RptUtils.caculateRtb(log.iseffective, log.isbilling, log.isbid, log.adorderid, log.iswin, log.winprice, log.adpayment)
val showClick = RptUtils.caculateShowClick(log.requestmode, log.iseffective)
parResult += ((newAppName, req ++ rtb ++ showClick))
})
jedis.close()
parResult.iterator
}).reduceByKey((list1, list2) => {
list1.zip(list2).map(t => t._1 + t._2)
}).map(t => t._1 + "," + t._2.mkString(",")).saveAsTextFile("outputPath")
}
}
最后
以上就是欢呼红牛为你收集整理的Spark算子--map和mapPartition的区别,配实例讲解的全部内容,希望文章能够帮你解决Spark算子--map和mapPartition的区别,配实例讲解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复