我是靠谱客的博主 端庄白羊,最近开发中收集的这篇文章主要介绍mapPartition,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

mapPartitions函数会对每个分区依次调用分区函数处理,然后将处理的结果(若干个Iterator)生成新的RDDs。
mapPartitions与map类似,但是如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。

 

SparkConf conf = new SparkConf();

       SparkSession session = new SparkSession.Builder()

               .config(conf)

               .appName("RunModifyiedFilm:shengjunyang@maoyan.com").master("local")

               .getOrCreate();

       jsc = new JavaSparkContext(session.sparkContext());

 List<Integer> data = Arrays.asList(1243567);

       JavaRDD<Integer> list = jsc.parallelize(data,2);

       api.testSparkCoreApiMapPartitions(list);

代码一

/**

    * mapPartitions

    * @param rdd

    * 返回JavaRDD

    */

   public void testSparkCoreApiMapPartitions(JavaRDD<Integer> rdd){

       JavaRDD<Integer> mapPartitionsRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {

           @Override

           public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {

               int isum = 0;

               LinkedList<Integer> linkedList = new LinkedList<Integer>();

               while(integerIterator.hasNext()) {

                   isum += integerIterator.next();

                   linkedList.add(isum);

               }

               return linkedList.iterator();

           }

       });

       System.out.println(mapPartitionsRDD.collect());

   }

 

分为两个分区,mapPartitions本意和map一样,主要是为了提高加速性能,计算结果:

代码二

JavaRDD<String> list3 = jsc.parallelize(Arrays.asList("a,1","b,2","a,3","b,4"));

        api.testSparkCoreApiGroupByKey(list3);

/**

pair1RDD数据集("a,1"),("b,2"),("a,3"),("b,4")

结果:(a,[(a,1), (a,3)])

    (b,[(b,2), (b,4)])

/

 public static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd) {

        JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() {

            @Override

            public Tuple2<String, Integer> call(String s) throws Exception {

                String[] st = s.split(",");

                return new Tuple2(st[0], Integer.valueOf(st[1]));

            }

        });

        pairRDD.groupBy(p->p._1()).collect().forEach(p-> System.out.println(p.toString()));

}

最后

以上就是端庄白羊为你收集整理的mapPartition的全部内容,希望文章能够帮你解决mapPartition所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部