我是靠谱客的博主 矮小夕阳,最近开发中收集的这篇文章主要介绍【Hadoop--MapReduce单词统计原理】学习目标:学习内容:,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

学习目标:

掌握MapReduce单词统计原理


学习内容:

  • 文字描述

    • 读取的数据为

      • hello,word,kafka,mapreduce,hadoop
        hello,word,kafka,mapreduce,hadoop
        hello,word,kafka,mapreduce,hadoop
        hello,word,kafka,mapreduce,hadoop
        hello,word,kafka,mapreduce,hadoop
        
    • 1.首先利用InputFormat抽象类的子类TextOutputFormat从文件中读取数据

      • TextOutputFormat,会一行一行的读取数据
      • 读后的数据是一个一个的键值对形式
        • <k1,v1>
        • <第1行的偏移量,第1行>
          • <0,“hello,word,kafka,mapreduce,hadoop”>
        • <第2行的偏移量,第2行>,假设第二行的偏移量为23
          • <23,“hello,word,kafka,mapreduce,hadoop”>
        • 。。。
    • 2.map阶段将每个<k1,v1>进行处理,map阶段需要编写java程序,实现处理的逻辑

      • map阶段会把<k1,v1> ==> <k2,v2>

        • <k2,v2>
        • <hello,1>
        • <word,1>
        • <kafka,1>
        • <mapreduce,1>
        • <hadoop,1>
        • <hello,1>
        • 。。。
      • 代码逻辑

        • public static class MyMapper extends Mapper<LongWritable,Text, Text,LongWritable>{
                  //重写map方法
                  /*
                  把<k1,v1> --> <k2,v2>
                  <0,hadoop,mapReduce,flink,spark> --> <hadoop,1>
                                                        <mapReduce,1>
                                                        <flink,1>
                                                        <spark,1>
                   */
                  @Override
                  protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                      //获取一行的文本数据,并分割
                      String[] splits = value.toString().split(",");
                      //遍历分割后的数据
                      for (String split : splits) {
                          //将k2,v2写入上下文
                          context.write(new Text(split),new LongWritable(1));
                      }
                  }
              }
          
    • 3.shuffle阶段,会把<k2,v2> ==>新<k2,v2>,

      • shuffle阶段有四个阶段:分区、排序、规约、分组,这里不对shuflle阶段编写逻辑,使用默认逻辑(默认逻辑不需要编写代码),默认逻辑处理后的<k2,v2>

        • <k2,v2>

        • <hello,<1,1,1,1>>

        • <word,<1,1,1,1>>

        • 。。。

      • 把相同key的value保存到同一集合<1,1,1,1>

    • 4.reduce阶段,把<k2,v2>转换为<k3,v3>,reduce阶段也是需要编写代码把每个键值的values集合<1,1,1,1>变为4,key不变

      • <k3,v3>

      • <hello,4>

      • <word,4>

      • 。。。

      • 代码逻辑

        •  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
                  //重写reduce方法
          
                  @Override
                  protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                      //遍历value集合将1进行相加  <1,1,1>  --> <3>
                      long count = 0;
                      for (LongWritable value : values) {
                          //注意value是LongWritable类型,需要调用get方法获取long类型
                          count += value.get();
                      }
                      //写入上下文环境
                      context.write(key, new LongWritable(count));
                  }
              }
          
    • 5.输出为文本文件,需要OutputFormat的子类TextOutputFormat

    • 完整代码(含注释)

      • import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Job;
        import org.apache.hadoop.mapreduce.Mapper;
        import org.apache.hadoop.mapreduce.Reducer;
        import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
        
        import java.io.IOException;
        import java.net.URI;
        import java.net.URISyntaxException;
        
        /*需要统计的单词:
        hadoop,mapReduce,flink,spark
        hello,word,zookeeper,kafka
        hadoop,mapReduce,flink,spark
        hello,word,zookeeper,kafka
        hadoop,mapReduce,flink,spark
        hello,word,zookeeper,kafka
        hadoop,mapReduce,flink,spark
        hello,word,zookeeper,kafka
         */
        
        
        public class WordCount {
        
        
            //map处理逻辑
            /**
             * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
             *     KEYIN: k1数据类型
             *     VALUEIN:v1数据类型
             *
             *     KEYOUT: k2数据类型
             *     VALUEOUT:v2数据类型
             */
            /*k1是这一行对应开头的偏移量,v1是这一行的数据
                k1              v1
                0      hadoop,mapReduce,flink,spark
                20     hello,word,zookeeper,kafka
                .
                .
                .
            ---------------------------------------------------
                k2              v2
                hadoop          1
                mapReduce       1
                flink           1
                spark           1
                .
                .
                .
                hadoop          1
                mapReduce       1
                .
                .
                .
        
             */
            public static class MyMapper extends Mapper<LongWritable,Text, Text,LongWritable>{
                //重写map方法
                /*
                把<k1,v1> --> <k2,v2>
                <0,hadoop,mapReduce,flink,spark> --> <hadoop,1>
                                                      <mapReduce,1>
                                                      <flink,1>
                                                      <spark,1>
                 */
                @Override
                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                    //获取一行的文本数据,并分割
                    String[] splits = value.toString().split(",");
                    //遍历分割后的数据
                    for (String split : splits) {
                        //将k2,v2写入上下文
                        context.write(new Text(split),new LongWritable(1));
                    }
                }
            }
        
            //shuffle处理逻辑
            /**
             * shuffle处理逻辑一般不写,但是会有默认的处理
             */
            /*
                <k2,v2>  --> 新的<k2,v2>
                <hadoop,1> --> <hadoop,<1,1,1>>  集合中1的数量对应hadoop出现的次数
             */
        
        
            //reduce处理逻辑
            /**
             * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
             *     KEYIN:k2数据类型
             *     VALUEIN:v2数据类型
             *
             *     KEYOUT:k3数据类型
             *     VALUEOUT:v3数据类型
             */
            /*
                <k2,v2>  -->  <k3,v3>
                <hadoop,<1,1,1>>  --> <hadoop,3>
             */
            public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
                //重写reduce方法
        
                @Override
                protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
                    //遍历value集合将1进行相加  <1,1,1>  --> <3>
                    long count = 0;
                    for (LongWritable value : values) {
                        //注意value是LongWritable类型,需要调用get方法获取long类型
                        count += value.get();
                    }
                    //写入上下文环境
                    context.write(key, new LongWritable(count));
                }
            }
        
            //主方法
            public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
                //创建配置环境
                Configuration configuration = new Configuration();
                //创建job任务对象
                Job job = Job.getInstance(configuration, "WordCount");
                    //如果打包运行出错,则需要加该配置
                    job.setJarByClass(WordCount.class);
        
                //配置job对象(八个步骤)
                //1.指定文件读取方式和读取路径
                job.setInputFormatClass(TextInputFormat.class);
                TextInputFormat.setInputPaths(job, new Path("/wordcount"));
        
                //2.指定map阶段的处理方式和输出的数据类型
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
        
                //3. 4. 5. 6. 是shuffle阶段的分区、排序、规约、分组,使用默认方式。
        
                //7.指定reduce阶段的处理方式和输出的数据类型
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
        
                //8.指定输出文件的方式和输出路径
                job.setOutputFormatClass(TextOutputFormat.class);
                //判断输出路径是否存在,如果存在则删除,
                //获取FileSystem
                FileSystem fileSystem = FileSystem.get(new Configuration());
                Path path = new Path("/wordcount_out");
                //判断路径是否存在,如果存在则删除
                if (fileSystem.exists(path)){
                    fileSystem.delete(path, true);
                }
        
                TextOutputFormat.setOutputPath(job, path);
        
                //等待任务结束
                job.waitForCompletion(true);
            }
        }
        
        

最后

以上就是矮小夕阳为你收集整理的【Hadoop--MapReduce单词统计原理】学习目标:学习内容:的全部内容,希望文章能够帮你解决【Hadoop--MapReduce单词统计原理】学习目标:学习内容:所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部