概述
mapPartitions函数会对每个分区依次调用分区函数处理,然后将处理的结果(若干个Iterator)生成新的RDDs。
mapPartitions与map类似,但是如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
SparkConf conf =
new
SparkConf();
SparkSession session =
new
SparkSession.Builder()
.config(conf)
.appName(
"RunModifyiedFilm:shengjunyang@maoyan.com"
).master(
"local"
)
.getOrCreate();
jsc =
new
JavaSparkContext(session.sparkContext());
List<Integer> data = Arrays.asList(
1
,
2
,
4
,
3
,
5
,
6
,
7
);
JavaRDD<Integer> list = jsc.parallelize(data,
2
);
api.testSparkCoreApiMapPartitions(list);
代码一
/**
* mapPartitions
* @param rdd
* 返回JavaRDD
*/
public
void
testSparkCoreApiMapPartitions(JavaRDD<Integer> rdd){
JavaRDD<Integer> mapPartitionsRDD = rdd.mapPartitions(
new
FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public
Iterator<Integer> call(Iterator<Integer> integerIterator)
throws
Exception {
int
isum =
0
;
LinkedList<Integer> linkedList =
new
LinkedList<Integer>();
while
(integerIterator.hasNext()) {
isum += integerIterator.next();
linkedList.add(isum);
}
return
linkedList.iterator();
}
});
System.out.println(mapPartitionsRDD.collect());
}
分为两个分区,mapPartitions本意和map一样,主要是为了提高加速性能,计算结果:
代码二
JavaRDD<String> list3 = jsc.parallelize(Arrays.asList(
"a,1"
,
"b,2"
,
"a,3"
,
"b,4"
));
api.testSparkCoreApiGroupByKey(list3);
/**
pair1RDD数据集(
"a,1"
),(
"b,2"
),(
"a,3"
),(
"b,4"
)
结果:(a,[(a,
1
), (a,
3
)])
(b,[(b,
2
), (b,
4
)])
/
public
static
void
testSparkCoreApiGroupByKey(JavaRDD<String> rdd) {
JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(
new
PairFunction<String, String, Integer>() {
@Override
public
Tuple2<String, Integer> call(String s)
throws
Exception {
String[] st = s.split(
","
);
return
new
Tuple2(st[
0
], Integer.valueOf(st[
1
]));
}
});
pairRDD.groupBy(p->p._1()).collect().forEach(p-> System.out.println(p.toString()));
}
最后
以上就是端庄白羊为你收集整理的mapPartition的全部内容,希望文章能够帮你解决mapPartition所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复