概述
学习目标:
掌握MapReduce单词统计原理
学习内容:
-
文字描述
-
读取的数据为
-
hello,word,kafka,mapreduce,hadoop hello,word,kafka,mapreduce,hadoop hello,word,kafka,mapreduce,hadoop hello,word,kafka,mapreduce,hadoop hello,word,kafka,mapreduce,hadoop
-
-
1.首先利用InputFormat抽象类的子类TextOutputFormat从文件中读取数据
- TextOutputFormat,会一行一行的读取数据
- 读后的数据是一个一个的键值对形式
- <k1,v1>
- <第1行的偏移量,第1行>
- <0,“hello,word,kafka,mapreduce,hadoop”>
- <第2行的偏移量,第2行>,假设第二行的偏移量为23
- <23,“hello,word,kafka,mapreduce,hadoop”>
- 。。。
-
2.map阶段将每个<k1,v1>进行处理,map阶段需要编写java程序,实现处理的逻辑
-
map阶段会把<k1,v1> ==> <k2,v2>
- <k2,v2>
- <hello,1>
- <word,1>
- <kafka,1>
- <mapreduce,1>
- <hadoop,1>
- <hello,1>
- 。。。
-
代码逻辑
-
public static class MyMapper extends Mapper<LongWritable,Text, Text,LongWritable>{ //重写map方法 /* 把<k1,v1> --> <k2,v2> <0,hadoop,mapReduce,flink,spark> --> <hadoop,1> <mapReduce,1> <flink,1> <spark,1> */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //获取一行的文本数据,并分割 String[] splits = value.toString().split(","); //遍历分割后的数据 for (String split : splits) { //将k2,v2写入上下文 context.write(new Text(split),new LongWritable(1)); } } }
-
-
-
3.shuffle阶段,会把<k2,v2> ==>新<k2,v2>,
-
shuffle阶段有四个阶段:分区、排序、规约、分组,这里不对shuflle阶段编写逻辑,使用默认逻辑(默认逻辑不需要编写代码),默认逻辑处理后的<k2,v2>
-
<k2,v2>
-
<hello,<1,1,1,1>>
-
<word,<1,1,1,1>>
-
。。。
-
-
把相同key的value保存到同一集合<1,1,1,1>
-
-
4.reduce阶段,把<k2,v2>转换为<k3,v3>,reduce阶段也是需要编写代码把每个键值的values集合<1,1,1,1>变为4,key不变
-
<k3,v3>
-
<hello,4>
-
<word,4>
-
。。。
-
代码逻辑
-
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ //重写reduce方法 @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //遍历value集合将1进行相加 <1,1,1> --> <3> long count = 0; for (LongWritable value : values) { //注意value是LongWritable类型,需要调用get方法获取long类型 count += value.get(); } //写入上下文环境 context.write(key, new LongWritable(count)); } }
-
-
-
5.输出为文本文件,需要OutputFormat的子类TextOutputFormat
-
完整代码(含注释)
-
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /*需要统计的单词: hadoop,mapReduce,flink,spark hello,word,zookeeper,kafka hadoop,mapReduce,flink,spark hello,word,zookeeper,kafka hadoop,mapReduce,flink,spark hello,word,zookeeper,kafka hadoop,mapReduce,flink,spark hello,word,zookeeper,kafka */ public class WordCount { //map处理逻辑 /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN: k1数据类型 * VALUEIN:v1数据类型 * * KEYOUT: k2数据类型 * VALUEOUT:v2数据类型 */ /*k1是这一行对应开头的偏移量,v1是这一行的数据 k1 v1 0 hadoop,mapReduce,flink,spark 20 hello,word,zookeeper,kafka . . . --------------------------------------------------- k2 v2 hadoop 1 mapReduce 1 flink 1 spark 1 . . . hadoop 1 mapReduce 1 . . . */ public static class MyMapper extends Mapper<LongWritable,Text, Text,LongWritable>{ //重写map方法 /* 把<k1,v1> --> <k2,v2> <0,hadoop,mapReduce,flink,spark> --> <hadoop,1> <mapReduce,1> <flink,1> <spark,1> */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //获取一行的文本数据,并分割 String[] splits = value.toString().split(","); //遍历分割后的数据 for (String split : splits) { //将k2,v2写入上下文 context.write(new Text(split),new LongWritable(1)); } } } //shuffle处理逻辑 /** * shuffle处理逻辑一般不写,但是会有默认的处理 */ /* <k2,v2> --> 新的<k2,v2> <hadoop,1> --> <hadoop,<1,1,1>> 集合中1的数量对应hadoop出现的次数 */ //reduce处理逻辑 /** * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN:k2数据类型 * VALUEIN:v2数据类型 * * KEYOUT:k3数据类型 * VALUEOUT:v3数据类型 */ /* <k2,v2> --> <k3,v3> <hadoop,<1,1,1>> --> <hadoop,3> */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ //重写reduce方法 @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //遍历value集合将1进行相加 <1,1,1> --> <3> long count = 0; for (LongWritable value : values) { //注意value是LongWritable类型,需要调用get方法获取long类型 count += value.get(); } //写入上下文环境 context.write(key, new LongWritable(count)); } } //主方法 public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { //创建配置环境 Configuration configuration = new Configuration(); //创建job任务对象 Job job = Job.getInstance(configuration, "WordCount"); //如果打包运行出错,则需要加该配置 job.setJarByClass(WordCount.class); //配置job对象(八个步骤) //1.指定文件读取方式和读取路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path("/wordcount")); //2.指定map阶段的处理方式和输出的数据类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3. 4. 5. 6. 是shuffle阶段的分区、排序、规约、分组,使用默认方式。 //7.指定reduce阶段的处理方式和输出的数据类型 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //8.指定输出文件的方式和输出路径 job.setOutputFormatClass(TextOutputFormat.class); //判断输出路径是否存在,如果存在则删除, //获取FileSystem FileSystem fileSystem = FileSystem.get(new Configuration()); Path path = new Path("/wordcount_out"); //判断路径是否存在,如果存在则删除 if (fileSystem.exists(path)){ fileSystem.delete(path, true); } TextOutputFormat.setOutputPath(job, path); //等待任务结束 job.waitForCompletion(true); } }
-
-
最后
以上就是矮小夕阳为你收集整理的【Hadoop--MapReduce单词统计原理】学习目标:学习内容:的全部内容,希望文章能够帮你解决【Hadoop--MapReduce单词统计原理】学习目标:学习内容:所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复