package com.uplooking.bigdata.core.p1;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* Java8之后的函数式编程
* 在集群中运行程序
* 遇到的问题:
* java.lang.IllegalArgumentException: java.net.UnknownHostException: ns1
需要在给spark运行程序中指明一个hdfs中的conf的配置信息
所以在SPARK_HOME/conf/spark-default.conf中配置
spark.files /opt/hadoop/etc/hadoop/hdfs-site.xml,/opt/hadoop/etc/hadoop/core-site.xml
这一句话只需要在提交spark程序的那台机器上进行配置说明即可,当然配置成功之后,最好重启spark集群,
当然不重启也无所谓!
*/
public class SparkWordCountApp {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName(SparkWordCountApp.class.getSimpleName())
.setMaster("spark://master:7077");
JavaSparkContext sc = new JavaSparkContext(conf);
//加载数据
//从hdfs上面加载数据
JavaRDD<String> linesRDD = sc.textFile("hdfs://master:9000/hello");
JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> {
return Arrays.asList(line.split(" "));
});
JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(word -> {
return new Tuple2<String, Integer>(word, 1);
});
JavaPairRDD<String, Integer> retRDD = pairRDD.reduceByKey((v1, v2) -> {
return v1 + v2;
});
//这里在集群上面运行的时候,是看不到输出结果的
/*retRDD.foreach(t -> {
System.out.println(t);
});
foreach的操作,是遍历rdd中的每一个partition中的每一条数据,
这些partition在哪里?
这些partition在worker上面,那么执行上述foreach是不是就应该在partition对应的worker节点上面
显示内容,我们是在作业启动的机器上面查看内容的,所以就看不到!
为什么使用了collect就又能够看到了呢?
collect和foreach一样,也是一个action的操作,作用时间计算后在partition上面的数据,拉回到
到作业提交的这台机器上面,那么这个时候,再在作业提交的机器上面遍历数据,就有内容了!
我们把提交作业的这台机器称之为Driver,只不过因为咱们资源有限,所以Driver和Master作为一台机器了,
在工作中,这些Driver都是单独的机器,而且不止一台Driver节点。
*/
List<Tuple2<String, Integer>> list = retRDD.collect();
for(Tuple2<String, Integer> t : list) {
System.out.println(t);
}
}
}
最后
以上就是拼搏战斗机最近收集整理的关于Spark之WordCount的全部内容,更多相关Spark之WordCount内容请搜索靠谱客的其他文章。
发表评论 取消回复