我是靠谱客的博主 拼搏战斗机,这篇文章主要介绍Spark之WordCount,现在分享给大家,希望可以做个参考。

package com.uplooking.bigdata.core.p1;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
 
import java.util.Arrays;
import java.util.List;
 
/**
 * Java8之后的函数式编程
 * 在集群中运行程序
 * 遇到的问题:
 * java.lang.IllegalArgumentException: java.net.UnknownHostException: ns1
     需要在给spark运行程序中指明一个hdfs中的conf的配置信息
     所以在SPARK_HOME/conf/spark-default.conf中配置
 
     spark.files /opt/hadoop/etc/hadoop/hdfs-site.xml,/opt/hadoop/etc/hadoop/core-site.xml
     这一句话只需要在提交spark程序的那台机器上进行配置说明即可,当然配置成功之后,最好重启spark集群,
     当然不重启也无所谓!
 */
public class SparkWordCountApp {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                    .setAppName(SparkWordCountApp.class.getSimpleName())
                    .setMaster("spark://master:7077");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //加载数据
        //从hdfs上面加载数据
        JavaRDD<String> linesRDD = sc.textFile("hdfs://master:9000/hello");
 
        JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> {
            return Arrays.asList(line.split(" "));
        });
 
        JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(word -> {
            return new Tuple2<String, Integer>(word, 1);
        });
 
        JavaPairRDD<String, Integer> retRDD = pairRDD.reduceByKey((v1, v2) -> {
            return v1 + v2;
        });
 
        //这里在集群上面运行的时候,是看不到输出结果的
        /*retRDD.foreach(t -> {
            System.out.println(t);
        });
            foreach的操作,是遍历rdd中的每一个partition中的每一条数据,
            这些partition在哪里?
                这些partition在worker上面,那么执行上述foreach是不是就应该在partition对应的worker节点上面
               显示内容,我们是在作业启动的机器上面查看内容的,所以就看不到!
            为什么使用了collect就又能够看到了呢?
                collect和foreach一样,也是一个action的操作,作用时间计算后在partition上面的数据,拉回到
             到作业提交的这台机器上面,那么这个时候,再在作业提交的机器上面遍历数据,就有内容了!
 
             我们把提交作业的这台机器称之为Driver,只不过因为咱们资源有限,所以Driver和Master作为一台机器了,
             在工作中,这些Driver都是单独的机器,而且不止一台Driver节点。
        */
        List<Tuple2<String, Integer>> list = retRDD.collect();
        for(Tuple2<String, Integer> t : list) {
            System.out.println(t);
        }
    }
}

最后

以上就是拼搏战斗机最近收集整理的关于Spark之WordCount的全部内容,更多相关Spark之WordCount内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部