概述
2.1 Spark中WordCount的执行流程
-
Spark的WordCount示例
package com.chh.spark.core.wc
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_WordCount {
def main(args: Array[String]): Unit = {
// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster( "local[*]").setAppName("WordCount")
// 创建Spark的上下文对象
val sc = new SparkContext(sparkConf)
// 读取文件
val source = sc.textFile("input/word.txt")
// 将文件中的数据进行分词
val word = source.flatMap(_.split(","))
// 将单词转为(x,1)
val wordToOne = word.map((_, 1))
// 将转换后的数据进行分组求和
val sumByKey = wordToOne.reduceByKey(_ + _)
//将数据收集到Drive端
val result = sumByKey.collect()
//对结果进行打印
result.foreach(println)
//关闭Spark连接
sc.stop()
}
}
-
WordCount中各个对象和方法的主要作用
-
SparkConf
创建SparkConf对象,设置Spark应用的配置信息。setAppName() 设置Spark应用程序在运行中的名字;如果是集群运行,就可以在监控页面直观看到我们运行的job任务。setMaster() 设置运行模式、是本地运行,设置为local即可;如果是集群运行,就可以设置程序要连接的Spark集群的master节点的url(localhost:7077)。
-
SparkContext
创建SparkContext对象, 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写,都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器(DAGSchedule、TaskScheduler),还会去Spark Master节点上进行注册等。所以SparkContext在Spark应用中是很重要的一个对象。
-
textFile
源码注释:
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. The text files must be encoded as UTF-8.
可以从本地文件系统或者HDFS上读取文件,将其转换为String类型的RDD数据集。注意:文件编码格式必须为UTF-8。
-
flatMap
-
A list of partitions
-
A function for computing each split
-
A list of dependencies on other RDD
-
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
-
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
它是RDD数据集对象的函数,作用于每个分区上,作用是将一条数据展开,也就是我们常说的一进多出。
-
-
map
学过python或者Java的人应该不陌生这个函数,典型的一进一出函数
-
reduceByKey
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
通过对key进行分组,然后求出每个key的value和,类似于Mapreduce中的combiner,所以也会产生shuffle。
-
collect
Return an array that contains all of the elements in this RDD. Note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
说白了,就是将executor中的计算结果,发送到driver端的内存中,所以使用这个方法时,一定注意数据量的大小。
-
stop
关闭spark的连接
-
-
执行流程
2.2对比flink中WordCount的批处理
package com.zhisheng.data.sources;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class Main {
public static void main(String[] args) throws Exception{
// 创建flink流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置flink应用的并行度
env.setParallelism(1);
// 设置flink运行时的处理模式为批处理
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<String> dataStreamSource = env.readTextFile("books/word.txt");
dataStreamSource
.flatMap(
(String s, Collector<String> collector) -> {for (String string : s.split(",")) collector.collect(string);}
).returns(String.class)
.map(s -> Tuple2.of(s, 1)).returns(new TypeHint<Tuple2<String, Integer>>() {
})
.keyBy(t-> t.f0).sum(1)
.print();
env.execute("Flink add data source");
}
}
两者一对比就发现,Java语言处理数据时语法过于繁琐了,而Scala就显得简洁很多。当然,flink也是支持Scala的,而且flink1.16版本中优化了flink批处理的相关模块,flinkSQL未来或许可以如同sparkSQL一样无缝衔接hiveSQL。最最重要的,flink的流批一体已经很成熟了,一套架构,两种模式自动探查,牛逼~~
最后
以上就是机灵嚓茶为你收集整理的第二章 Spark入门案例2.1 Spark中WordCount的执行流程的全部内容,希望文章能够帮你解决第二章 Spark入门案例2.1 Spark中WordCount的执行流程所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复