我是靠谱客的博主 大意小甜瓜,最近开发中收集的这篇文章主要介绍Hadoop学习笔记(3):MapReduce初探Hadoop学习笔记(3):MapReduce初探,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Hadoop学习笔记(3):MapReduce初探

 

1. 借助MapReduce编写简单的词频统计程序

  1. 编写一个简单的MapReduce程序需要三个步骤:

    1. 编写Map处理逻辑

    2. 编写Reduce处理逻辑

    3. 编写main方法

    在进一步学习MapReduce之前,先使用Hadoop(版本号:2.7.7)的MapReduce实现一个简单的词频统计程序,以建立一个直观的印象,实验步骤如下:

  2. 在eclipse中新建项目、导入hadoop中必要的jar文件,新建一个类WordConttest,然后编写Map处理逻辑:

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
            public TokenizerMapper() {
                
        }
            
            private static final IntWritable one = new IntWritable(1);
            private Text word = new Text();
            // 通过重写map实现map处理逻辑
            public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{
                StringTokenizer itr = new StringTokenizer(value.toString());  //
                while(itr.hasMoreTokens()) {
                    this.word.set(itr.nextToken());
                    context.write(this.word, one);
                }
            }
        }

     

  3. 编写Reduce处理逻辑:

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
            public IntSumReducer() {
                
            }
            
            private IntWritable result = new IntWritable();
            // 通过重写reduce实现reduce处理逻辑
            public void reduce(Text key, Iterable<IntWritable>values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException{
                int sum = 0;
                IntWritable val;
                java.util.Iterator<IntWritable> itr = values.iterator();
                while (itr.hasNext()) {
                    val = (IntWritable)itr.next();
                    sum+=val.get();
                }
                this.result.set(sum);
                context.write(key, this.result);
            }
        }
    • 上面分别继承了org.apache.hadoop.mapreduce.Mapper和org.apache.hadoop.mapreduce.Reducer实际上,在编写Map处理逻辑和Reduce处理逻辑时,就是重写Mapper的map()方法和Reducer的reduce()方法

  4. 然后我们编写main函数:

    //程序以jar命令在命令行中运行,需要接收两个以上的启动参数,最后一个参数为输出文件位置,其余的作为输入文件
    public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();
            String[] otherArgs=(new GenericOptionsParser(conf, args)).getRemainingArgs();
            if(otherArgs.length<2) {
                System.err.println("Usage :wordcount<in>[<in>...]<out>");
                System.exit(2);
            }
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCounttest.class);
            job.setMapperClass(WordCounttest.TokenizerMapper.class);
            job.setReducerClass(WordCounttest.IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //遍历启动参数,设置输入文件目录
            for(int i=0; i<otherArgs.length-1;++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            //设置输出文件目录
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
            System.exit(job.waitForCompletion(true)?0:1);
            
    ​
        }
  5. 完整的词频统计程序如下:

    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    ​
    public class WordCounttest {
        public WordCounttest() {}
    ​
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();
            String[] otherArgs=(new GenericOptionsParser(conf, args)).getRemainingArgs();
            if(otherArgs.length<2) {
                System.err.println("Usage :wordcount<in>[<in>...]<out>");
                System.exit(2);
            }
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCounttest.class);
            job.setMapperClass(WordCounttest.TokenizerMapper.class);
            job.setReducerClass(WordCounttest.IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for(int i=0; i<otherArgs.length-1;++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
            System.exit(job.waitForCompletion(true)?0:1);
            
    ​
        }
    ​
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
            public TokenizerMapper() {
                
        }
            
            private static final IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{
                StringTokenizer itr = new StringTokenizer(value.toString());  //
                while(itr.hasMoreTokens()) {
                    this.word.set(itr.nextToken());
                    context.write(this.word, one);
                }
            }
        }
        
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
            public IntSumReducer() {
                
            }
            
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable>values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException{
                int sum = 0;
                IntWritable val;
                java.util.Iterator<IntWritable> itr = values.iterator();
                while (itr.hasNext()) {
                    val = (IntWritable)itr.next();
                    sum+=val.get();
                }
                this.result.set(sum);
                context.write(key, this.result);
            }
        }
    ​
    }
  6. 然后我们将程序打包为“可执行程序”,存放在hadoop/myapp/WordCounttest.jar,新建两个包含多个单词的txt文件并上传到HDFS服务器,存放在input/,设置输出文件目录为output,然后我们可以通过以下命令运行程序:

    ./bin/hadoop jar ./myapp/WordCount.jar WordCounttest input output

    WordCounttest为我们要运行的类名,书上没指定,运行成功、结束后,可以查看词频统计结果:

    hdfs dfs -cat output/*

  7. 小结:map环节继承Mapper类,并重写map()方法;reduce环节继承Reducer类,并重写reduce()方法;main()中主要构造了一个Job实例,并做了一系列的设置,最后设置了job的输入/输出文件目录

    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));

 

2. MapReduce的进一步探索

搜索Hadoop2.7.7的官方文档,查看MapReduce使用说明(MapReduce Tutorial):http://hadoop.apache.org/docs/r2.7.7/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

如果英文版看着费劲,也可以凑合早期版本Hadoop1.0.4的MapReduce中文版指南,大部分地方差不多:http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html#%E7%9B%AE%E7%9A%84

注:后文的引用部分翻译自hadoop2.7.7的官方文档

1. 使用MapReduce的前提

  • Prerequisites:在使用MapReduce之前,需确保Hadoop被正确配置、安装并运行,具体的,Hadoop支持3种运行方式:离线式、伪分布式(一个节点同时包含namenode和datanode)和分布式(多节点集群),推荐新手使用单节点集群。

2. MapReduce概述

  • OverView(hadoop 2.7.7)

    Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

    一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,这个切分由 map任务(task)以完全并行的方式处理。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

    通常,Map/Reduce框架和分布式文件系统( Hadoop Distributed File System,HDFS)是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

    Map/Reduce框架由一个单独的master ResourceManager 、每个cluster-node一个slave NodeManager、每个application一个MRAppMaster共同组成( 参考 YARN Architecture Guide)。

    应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给ResourceManager,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

    虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。

    • Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (例如:Shell工具)来做为mapper和reducer。

    • Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。

    要点如下:

    • MapReduce的工作方式:map任务以并行方式切分数据块,然后框架对map的输出进行排序,并作为reduce任务的输入,整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

    • M/R框架和HDFS运行在同一组节点上

    • 编写M/R应用程序必须要做的事情:

      1. 指明输入/输出路径

      2. 通过继承合适的接口或抽象类并实现它们提供的map和reduce函数

      3. 配置其它的作业参数,构成作业配置

    • MapReduce不是Hadoop中唯一的MR工具

3. MapReduce的输入和输出

Map/Reduce框架运转在<key, value> 键值对上,也就是说, 框架把作业的输入看为是一组<key, value> 键值对,同样也产出一组 <key, value> 键值对做为作业的输出,这两组键值对的类型可能不同。

框架需要对key和value的类(classes)进行序列化操作, 因此,这些类需要实现 Writable接口。 另外,为了方便框架执行排序操作,key类必须实现 WritableComparable接口。

一个MapReduce作业的输入输出类型如下所示:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

注:两个版本这一小节(2.7.7和1.0.4)是一样的

4.应用示例

代码和书上的一模一样,但是在运行结果后面还有两段关于命令行参数说明:

应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递以逗号分隔的档案文件列表(comma separated list of archives)做为参数,这些档案文件会被解压在task的当前工作目录下并且创建一个指向解压目录的符号链接(以压缩包的名字命名)。 有关命令行选项的更多细节请参考 Commands Guide

例子:

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output

在上面的例子中,myarchive.zip将被放置并解压到"myarchive.zip"文件中,用户也可以使用#来指定files和archives的名称:

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

在这个例子中,task可以通过dir1和dir2分别访问dir1/dict.txtdir2/dict.txt,并且'mytar.tgz'会被放置并解压到“tgzdir”中,三个参数的说明如下:

选项描述
-files <逗号分隔的文件列表>指定要拷贝到map reduce集群的文件的逗号分隔的列表。 只适用于job。
-libjars <逗号分隔的jar列表>指定要包含到classpath中的jar文件的逗号分隔的列表。 只适用于job。
-archives <逗号分隔的archive列表>指定要被解压到计算节点上的档案文件的逗号分割的列表。 只适用于job。

 

上面提到的Archives,Archive似乎可以用来聚集大量小文件来提升运行效率,因为当要处理大量小文件时,这些小文件往往会占用大量的datanode,把它们聚合起来一起处理可以极大地提高效率。上面提到的Archives应该只是普通的压缩文件,因为Hadoop archives文件的后缀是.har。下面摘一段Hadoop archives的介绍

Hadoop archives(Hadoop archives)是一种特殊格式的archives(档案文件),一个Hadoop archive映射一个文件系统路径,且它的扩展名中始终包含*.har,一个Hadoop archive路径包含metadata(形式为:_index and _masterindex)和data(part-*)文件。其中_index文件中包含文件的名称(这些文件是archive的一部分)以及它们在archive中的位置,详情参考Hadoop Archives Guide

 

5. WordCount V1.0的工作过程

WordCount程序非常直接明了:

public void map(Object key, Text value, Context context
             ) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
 word.set(itr.nextToken());
 context.write(word, one);
}
}
  • 首先,通过继承Mapper并重写map(),每次处理一行数据,这些数据由指定的TextInputFormat提供。

  • 然后,程序通过StringTokenizer,以空格为切分符,把每一行数据按照切分为单词,并且发出键-值对<<word>,1>。

对于示例输入,作为输入的文件内容如下:

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

第一个map发出:

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二个map发出:

< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

关于一个job的map数量如何产生、以及如何以细粒度的方式控制它们,在稍后以部分介绍。

WordCount还指定了一个combiner。因此,每一个map的输出会通过local combiner(按照作业的配置,和reducer 一样)在本地进行聚合,然后会根据key进行排序:

第一个map的输出:

< Bye, 1>
< Hello, 1>
< World, 2>`

第二个map的输出:

< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>`

然后是Reducer:

public void reduce(Text key, Iterable&lt;IntWritable&gt; values,
                  Context context
                  ) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
   sum += val.get();
 }
 result.set(sum);
 context.write(key, result);
}

在Reducer的实现中,通过reduce()方法,只是简单地对每一个key(本例中是word)进行计数求和。

所以这个作业的输出是:

< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>`

在main()函数中制定了job的几个方面,如:通过Command line传递过来的输入/输出路径,key/value的类型、输入/输出的格式等。随后,程序调用job.waitForCompletion提交作业并且监视它的执行

 

6. 小结

  • MapReduce是运行在<key,value>形式数据集的软件框架

  • 从inout到output:

    • map任务处理输入,并通过context.write()发出(emit)处理过的<key,value>

    • map的输出经由combiner进行聚合后,根据key排序

    • 有序的<key,value>集合作为reduce任务的输入,处理后通过context.write()产生最终输出

  • 到此,对MapReduce的工作过程有了一个比较清晰的理解,MapReduce还有很多知识点,还要继续学习。

转载于:https://my.oschina.net/u/4126607/blog/3050578

最后

以上就是大意小甜瓜为你收集整理的Hadoop学习笔记(3):MapReduce初探Hadoop学习笔记(3):MapReduce初探的全部内容,希望文章能够帮你解决Hadoop学习笔记(3):MapReduce初探Hadoop学习笔记(3):MapReduce初探所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部