mapPartitions函数会对每个分区依次调用分区函数处理,然后将处理的结果(若干个Iterator)生成新的RDDs。
mapPartitions与map类似,但是如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
SparkConf conf = new SparkConf();
SparkSession session = new SparkSession.Builder()
.config(conf)
.appName("RunModifyiedFilm:shengjunyang@maoyan.com").master("local")
.getOrCreate();
jsc = new JavaSparkContext(session.sparkContext());
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> list = jsc.parallelize(data,2);
api.testSparkCoreApiMapPartitions(list);
代码一
/**
* mapPartitions
* @param rdd
* 返回JavaRDD
*/
public void testSparkCoreApiMapPartitions(JavaRDD<Integer> rdd){
JavaRDD<Integer> mapPartitionsRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
int isum = 0;
LinkedList<Integer> linkedList = new LinkedList<Integer>();
while(integerIterator.hasNext()) {
isum += integerIterator.next();
linkedList.add(isum);
}
return linkedList.iterator();
}
});
System.out.println(mapPartitionsRDD.collect());
}
分为两个分区,mapPartitions本意和map一样,主要是为了提高加速性能,计算结果:
代码二
JavaRDD<String> list3 = jsc.parallelize(Arrays.asList("a,1","b,2","a,3","b,4"));
api.testSparkCoreApiGroupByKey(list3);
/**
pair1RDD数据集("a,1"),("b,2"),("a,3"),("b,4")
结果:(a,[(a,1), (a,3)])
(b,[(b,2), (b,4)])
/
public static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd) {
JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] st = s.split(",");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});
pairRDD.groupBy(p->p._1()).collect().forEach(p-> System.out.println(p.toString()));
}
最后
以上就是端庄白羊最近收集整理的关于mapPartition的全部内容,更多相关mapPartition内容请搜索靠谱客的其他文章。
发表评论 取消回复