我是靠谱客的博主 矮小画板,这篇文章主要介绍hadoop学习1——job执行过程,现在分享给大家,希望可以做个参考。

接触hadoop半年多了,主要使用hadoop+hive做数据分析。部署和使用现在都没什么问题了,但是就是对其内部原理不是非常清楚,所以准备从头从源码开始系统学习,把学习过程中的问题和自己的理解记录在此。

下面是一段调试wordcount:

环境:windows + cygwin + eclipse(怎么搭建环境、和搭建过程中遇到的问题以后有空再写,现在主要学习一下hadoop的运行原理),伪分布式模式

测试数据:

   t1.txt:

复制代码
1
2
3
hello world! hello ufida! yes i do! say something.

 

t2.txt:

复制代码
1
2
cow is a cow. word count job test.

 

 

调试代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class WordCount { static Logger log = Logger.getLogger(WordCount.class); public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { log.info("map 进程:" + Thread.currentThread().toString()); log.info("map 参数:key:" + key.get() + ";value:" + value); String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); log.info("word:" + word.toString()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { log.info("reduce 进程:" + Thread.currentThread().toString()); String s = ""; int sum = 0; while (values.hasNext()) { IntWritable i = values.next(); s = s + "[" + i.get() + "]"; sum += i.get(); } log.info("reduce 参数:key:" + key.toString() + ";values:" + s); output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { log.info("单词统计..."); JobConf conf = new JobConf(WordCount.class); log.info("jar包位置:" + conf.getJar()); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("/temp/in")); FileOutputFormat.setOutputPath(conf, new Path("/temp/out")); JobClient.runJob(conf); } }

 运行日志:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
12/02/09 11:08:05 INFO test.WordCount: 单词统计... 12/02/09 11:08:05 INFO test.WordCount: jar包位置:D:workspaceseclipseWorkspace.metadata.pluginsorg.apache.hadoop.eclipsehadoopTest_WordCount.java-234599505300279609.jar 12/02/09 11:08:06 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 12/02/09 11:08:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 12/02/09 11:08:06 INFO mapred.FileInputFormat: Total input paths to process : 2 12/02/09 11:08:06 INFO mapred.JobClient: Running job: job_local_0001 12/02/09 11:08:06 INFO mapred.FileInputFormat: Total input paths to process : 2 12/02/09 11:08:06 INFO mapred.MapTask: numReduceTasks: 1 12/02/09 11:08:06 INFO mapred.MapTask: io.sort.mb = 100 12/02/09 11:08:06 INFO mapred.MapTask: data buffer = 79691776/99614720 12/02/09 11:08:06 INFO mapred.MapTask: record buffer = 262144/327680 12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:06 INFO test.WordCount: map 参数:key:0;value:hello world! hello ufida! 12/02/09 11:08:06 INFO test.WordCount: word:hello 12/02/09 11:08:06 INFO test.WordCount: word:world! 12/02/09 11:08:06 INFO test.WordCount: word:hello 12/02/09 11:08:06 INFO test.WordCount: word:ufida! 12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:06 INFO test.WordCount: map 参数:key:27;value:yes i do! 12/02/09 11:08:06 INFO test.WordCount: word:yes 12/02/09 11:08:06 INFO test.WordCount: word:i 12/02/09 11:08:06 INFO test.WordCount: word:do! 12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:06 INFO test.WordCount: map 参数:key:38;value:say something. 12/02/09 11:08:06 INFO test.WordCount: word:say 12/02/09 11:08:06 INFO test.WordCount: word:something. 12/02/09 11:08:06 INFO mapred.MapTask: Starting flush of map output 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:do!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:hello;values:[1][1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:i;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:say;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:something.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:ufida!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:world!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:yes;values:[1] 12/02/09 11:08:07 INFO mapred.MapTask: Finished spill 0 12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 12/02/09 11:08:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/temp/in/t1.txt:0+52 12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done. 12/02/09 11:08:07 INFO mapred.MapTask: numReduceTasks: 1 12/02/09 11:08:07 INFO mapred.MapTask: io.sort.mb = 100 12/02/09 11:08:07 INFO mapred.MapTask: data buffer = 79691776/99614720 12/02/09 11:08:07 INFO mapred.MapTask: record buffer = 262144/327680 12/02/09 11:08:07 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: map 参数:key:0;value:cow is a cow. 12/02/09 11:08:07 INFO test.WordCount: word:cow 12/02/09 11:08:07 INFO test.WordCount: word:is 12/02/09 11:08:07 INFO test.WordCount: word:a 12/02/09 11:08:07 INFO test.WordCount: word:cow. 12/02/09 11:08:07 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: map 参数:key:15;value:word count job test. 12/02/09 11:08:07 INFO test.WordCount: word:word 12/02/09 11:08:07 INFO test.WordCount: word:count 12/02/09 11:08:07 INFO test.WordCount: word:job 12/02/09 11:08:07 INFO test.WordCount: word:test. 12/02/09 11:08:07 INFO mapred.MapTask: Starting flush of map output 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:a;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:count;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:is;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:job;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:test.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:word;values:[1] 12/02/09 11:08:07 INFO mapred.MapTask: Finished spill 0 12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 12/02/09 11:08:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/temp/in/t2.txt:0+35 12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done. 12/02/09 11:08:07 INFO mapred.LocalJobRunner: 12/02/09 11:08:07 INFO mapred.Merger: Merging 2 sorted segments 12/02/09 11:08:07 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes 12/02/09 11:08:07 INFO mapred.LocalJobRunner: 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:a;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:count;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:do!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:hello;values:[2] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:i;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:is;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:job;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:say;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:something.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:test.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:ufida!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:word;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:world!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:yes;values:[1] 12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 12/02/09 11:08:07 INFO mapred.LocalJobRunner: 12/02/09 11:08:07 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 12/02/09 11:08:07 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9000/temp/out 12/02/09 11:08:07 INFO mapred.LocalJobRunner: reduce > reduce 12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done. 12/02/09 11:08:07 INFO mapred.JobClient: map 100% reduce 100% 12/02/09 11:08:07 INFO mapred.JobClient: Job complete: job_local_0001 12/02/09 11:08:07 INFO mapred.JobClient: Counters: 15 12/02/09 11:08:07 INFO mapred.JobClient: FileSystemCounters 12/02/09 11:08:07 INFO mapred.JobClient: FILE_BYTES_READ=62828 12/02/09 11:08:07 INFO mapred.JobClient: HDFS_BYTES_READ=62311 12/02/09 11:08:07 INFO mapred.JobClient: FILE_BYTES_WRITTEN=63761 12/02/09 11:08:07 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=125860 12/02/09 11:08:07 INFO mapred.JobClient: Map-Reduce Framework 12/02/09 11:08:07 INFO mapred.JobClient: Reduce input groups=16 12/02/09 11:08:07 INFO mapred.JobClient: Combine output records=16 12/02/09 11:08:07 INFO mapred.JobClient: Map input records=5 12/02/09 11:08:07 INFO mapred.JobClient: Reduce shuffle bytes=0 12/02/09 11:08:07 INFO mapred.JobClient: Reduce output records=16 12/02/09 11:08:07 INFO mapred.JobClient: Spilled Records=32 12/02/09 11:08:07 INFO mapred.JobClient: Map output bytes=154 12/02/09 11:08:07 INFO mapred.JobClient: Map input bytes=87 12/02/09 11:08:07 INFO mapred.JobClient: Combine input records=17 12/02/09 11:08:07 INFO mapred.JobClient: Map output records=17 12/02/09 11:08:07 INFO mapred.JobClient: Reduce input records=16

 本以为hadoop会开很多线程来运行一个job,但是从日志“Thread[Thread-14,5,main]”可以看出其实一直都是一个线程在运行,可能是因为数据量太小,没有超过一个块的大小,所以只开了一个线程吧。具体以后再研究一下源码。仔细看日志,可以发现其大概运行过程如下(伪代码):

复制代码
1
2
3
4
5
6
7
8
9
checkNumberPath();//检查输入文件个数(2个) for(i=0;i<2;i++){ Array lines = readFile(i);//读取文件所有的行 for(line : lines){ map();//解析出word,添加到Collector combine(); } } reduce();

 从日志最后几行,map过程、combine过程、reduce过程 之前之后多少个输入和输出也能可能出大概过程。

最后

以上就是矮小画板最近收集整理的关于hadoop学习1——job执行过程的全部内容,更多相关hadoop学习1——job执行过程内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部