我是靠谱客的博主 过时溪流,这篇文章主要介绍MapReduce编程模型MapReduce/Hadoop为什么会有MapReduce?MapReduce数据模型,现在分享给大家,希望可以做个参考。

MapReduce/Hadoop

MapReduce是目前云计算中最广泛使用的计算模型,由Google于2004年提出,谷歌关于云计算有三篇著名的论文:

  1. 《Bigtable_A Distributed Storage System for Structured Data》
  2. 《MapReduce: Simplied Data Processing on Large Clusters》
  3. 《The Google File System》

论文下载地址:http://pan.baidu.com/s/1o6G8PGA
Hadoop是MapReduce的一个开源实现,核心框架有2个:HDFS和MapReduce,HDFS为海量数据提供存储,MapReduce为海量数据提供计算。

为什么会有MapReduce?

在MapReduce提出之前,编写并行分布式程序需要下列技术:

  • Multi‐threading( 多线程编程)
  • Socket programming(socket网络编程)
  • Data distribution( 数据分发)
  • Job distribution, coordination, load balancing(任务分发、协调、负载平衡)
  • Fault tolerance( 容错性能)
  • Debugging( 调试)

上面每个方面都需要学习和经验积累,要想编写并行分布式程序并不容易,需要非常有经验的程序员和调试技巧,调试分布式系统很花时间和精力。为了解决这一问题,提出来解决思路:

  • 程序员写串行程序,编程序时不需要思考并行的问题,调试时只需要保证串行执行正确。
  • 由系统完成并行分布式地执行,负责并行分布执行的正确性和效率

但是这样也带来问题:牺牲了程序的功能。直接进行并行分布式编程,可以完成各种各样丰富的功能,而一个编程模型实际上是限定了程序的功能类型。因此要求系统的编程模型必须有代表性,必须代表一大类重要的应用才有生命力。

MapReduce的核心思想是分而治之,把大的任务分成若干个小任务,并行执行小任务,最后把所有的结果汇总。

MapReduce数据模型

MapReduce的数据模型:

  • <key, value>
  • 数据由一条一条的记录组成
  • 记录之间是无序的
  • 每一条记录有一个key,和一个value
  • key: 可以不唯一
  • key与value的具体类型和内部结构由程序员决定,系统基
    本上把它们看作黑匣

图解:
这里写图片描述

下面以wordcount为例说明MapReduce计算过程:
输入文本:

复制代码
1
hello world hadoop hdfs hadoop hello hadoop hdfs

map输出:

复制代码
1
2
3
4
5
6
7
8
<hello,1> <world,1> <hadoop,1> <hdfs,1> <hadoop,1> <hello,1> <hadoop,1> <hdfs,1>

shuffle(洗牌)过程把key值相同的value合并成list作为reduce输入:

复制代码
1
2
3
4
<hello,<1,1>> <world,1> <hadoop,<111>> <hdfs,<1,1>>

reduce输出:

复制代码
1
2
3
4
<hello,2> <world,1> <hadoop,3> <hdfs,1>

关于Wordcount运行例子可以参考hadoop helloworld(wordcount),代码解读博客园上有一篇很详细的文章Hadoop集群(第6期)_WordCount运行详解.

附wordcount源码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { // This is the Mapper class // reference: http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Mapper.html // public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumCombiner extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } // This is the Reducer class // reference http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Reducer.html // // We want to control the output format to look at the following: // // count of word = count // public static class IntSumReducer extends Reducer<Text,IntWritable,Text,Text> { private Text result_key= new Text(); private Text result_value= new Text(); private byte[] prefix; private byte[] suffix; protected void setup(Context context) { try { prefix= Text.encode("count of ").array(); suffix= Text.encode(" =").array(); } catch (Exception e) { prefix = suffix = new byte[0]; } } public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // generate result key result_key.set(prefix); result_key.append(key.getBytes(), 0, key.getLength()); result_key.append(suffix, 0, suffix.length); // generate result value result_value.set(Integer.toString(sum)); context.write(result_key, result_value); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumCombiner.class); job.setReducerClass(IntSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // add the input paths as given by command line for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } // add the output path as given by the command line FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

最后

以上就是过时溪流最近收集整理的关于MapReduce编程模型MapReduce/Hadoop为什么会有MapReduce?MapReduce数据模型的全部内容,更多相关MapReduce编程模型MapReduce/Hadoop为什么会有MapReduce内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部