我是靠谱客的博主 幸福蚂蚁,这篇文章主要介绍Hadoop2.6.0学习笔记(七)MapReduce分区,现在分享给大家,希望可以做个参考。

鲁春利的工作笔记,谁说程序员不能有文艺范?



MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。默认情况下,MapReduce中使用的是HashPartitioner。

复制代码
1
2
3
4
5
6
7
/** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> {   /** Use {@link Object#hashCode()} to partition. */   public int getPartition(K key, V value, int numReduceTasks) {     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;   } }

在HashPartitioner中getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。通过取key的hashCode,然后通过和Integer.MAX_VALUE与运算被转换为一个非负整数,任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0,也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。


示例:对于通过不同协议访问某些url数据进行统计(日志五元组)

原始数据

复制代码
1
2
3
4
5
6
[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt 2013-09-13 16:04:08     www.subnetc1.com        192.168.1.7     80      192.168.1.139   18863   FTP     www.subnetc1.com/index.html 2013-09-13 16:04:08     www.subnetc2.com        192.168.1.7     80      192.168.1.159   14100   HTTP    www.subnetc2.com/index.html 2013-09-13 16:04:08     www.subnetc3.com        192.168.1.7     80      192.168.1.130   4927    HTTPS   www.subnetc3.com/index.html 2013-09-13 16:04:08     www.subnetc4.com        192.168.1.7     80      192.168.1.154   39044   HTTP    www.subnetc4.com/index.html [hadoop@nnode code]$


实现Mapper

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.lucl.hadoop.mapreduce.part; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /**  *   * @author luchunli  * @description 实现Mapper  *  */ public class ProtocolMapper extends Mapper<LongWritable, Text, Text, Text> {     @Override     protected void map(LongWritable key, Text value, Context context)             throws IOException, InterruptedException {         String [] values = value.toString().split("t");         if (null == values || values.length != 8) {             return;         }         Text newKey = new Text();         Text newValue = new Text();         newKey.set(values[6].trim());         newValue.set(values[7].trim());                  context.write(newKey, newValue);     } }


实现Reducer

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.lucl.hadoop.mapreduce.part; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /**  *   * @author luchunli  * @description 实现Reducer  *  */ public class ProtocolReducer extends Reducer<Text, Text, Text, Text> {     @Override     protected void reduce(Text key, Iterable<Text> values, Context context)             throws IOException, InterruptedException {         StringBuffer sbf = new StringBuffer();         for (Text text : values) {             sbf.append(text.toString());             sbf.append(";");         }         context.write(key, new Text(sbf.toString()));     } }


实现Partitioner

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lucl.hadoop.mapreduce.part; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /**  *   * @author luchunli  * @description 自定义分区类  *  */ public class ProtocolPartitioner extends Partitioner<Text, Text> {     @Override     public int getPartition(Text key, Text value, int numReduceTasks) {         if (key.toString().equals("FTP")) {             return 0;         }          if (key.toString().equals("HTTP")) {             return 1;         }         if (key.toString().equals("HTTPS")) {             return 2;         }         return 0;     } }


实现驱动器类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.lucl.hadoop.mapreduce.part; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ProtocolDriver extends Configured implements Tool {     public static void main(String[] args) {         try {             ToolRunner.run(new ProtocolDriver(), args);         } catch (Exception e) {             e.printStackTrace();         }     }          @Override     public int run(String[] args) throws Exception {         Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());                  job.setJarByClass(ProtocolDriver.class);                  FileInputFormat.addInputPath(job, new Path(args[0]));                  job.setMapperClass(ProtocolMapper.class);         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(Text.class);                  // 设置task reduce的个数         job.setNumReduceTasks(3);         job.setPartitionerClass(ProtocolPartitioner.class);                  job.setReducerClass(ProtocolReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);                  // job.setOutputFormatClass(ProtocolOutputFormat.class);                  FileOutputFormat.setOutputPath(job, new Path(args[1]));                  return job.waitForCompletion(true) ? 0 : 1;     } }


调用执行

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500018 15/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 1 15/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:1 15/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0008 15/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0008 15/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/ 15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_0008 15/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false 15/12/05 21:41:43 INFO mapreduce.Job:  map 0% reduce 0% 15/12/05 21:42:12 INFO mapreduce.Job:  map 100% reduce 0% 15/12/05 21:42:32 INFO mapreduce.Job:  map 100% reduce 33% 15/12/05 21:42:52 INFO mapreduce.Job:  map 100% reduce 100% 15/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully 15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50         File System Counters                 FILE: Number of bytes read=158                 FILE: Number of bytes written=431827                 FILE: Number of read operations=0                 FILE: Number of large read operations=0                 FILE: Number of write operations=0                 HDFS: Number of bytes read=532                 HDFS: Number of bytes written=130                 HDFS: Number of read operations=12                 HDFS: Number of large read operations=0                 HDFS: Number of write operations=6         Job Counters                  Killed reduce tasks=1                 Launched map tasks=1                 Launched reduce tasks=4                 Data-local map tasks=1                 Total time spent by all maps in occupied slots (ms)=26277                 Total time spent by all reduces in occupied slots (ms)=105054                 Total time spent by all map tasks (ms)=26277                 Total time spent by all reduce tasks (ms)=105054                 Total vcore-seconds taken by all map tasks=26277                 Total vcore-seconds taken by all reduce tasks=105054                 Total megabyte-seconds taken by all map tasks=26907648                 Total megabyte-seconds taken by all reduce tasks=107575296         Map-Reduce Framework                 Map input records=4                 Map output records=4                 Map output bytes=132                 Map output materialized bytes=158                 Input split bytes=109                 Combine input records=0                 Combine output records=0                 Reduce input groups=3                 Reduce shuffle bytes=158                 Reduce input records=4                 Reduce output records=3                 Spilled Records=8                 Shuffled Maps =3                 Failed Shuffles=0                 Merged Map outputs=3                 GC time elapsed (ms)=410                 CPU time spent (ms)=4360                 Physical memory (bytes) snapshot=515862528                 Virtual memory (bytes) snapshot=3399213056                 Total committed heap usage (bytes)=167907328         Shuffle Errors                 BAD_ID=0                 CONNECTION=0                 IO_ERROR=0                 WRONG_LENGTH=0                 WRONG_MAP=0                 WRONG_REDUCE=0         File Input Format Counters                  Bytes Read=423         File Output Format Counters                  Bytes Written=130 [hadoop@nnode code]$


查看结果

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
[hadoop@nnode code]$ hdfs dfs -ls /2015120500018 Found 4 items -rw-r--r--   2 hadoop hadoop          0 2015-12-05 21:42 /2015120500018/_SUCCESS -rw-r--r--   2 hadoop hadoop         33 2015-12-05 21:42 /2015120500018/part-r-00000 -rw-r--r--   2 hadoop hadoop         62 2015-12-05 21:42 /2015120500018/part-r-00001 -rw-r--r--   2 hadoop hadoop         35 2015-12-05 21:42 /2015120500018/part-r-00002 [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000 FTP     www.subnetc1.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001 HTTP    www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002 HTTPS   www.subnetc3.com/index.html; [hadoop@nnode code]$


上述生成的文件命名格式是MapReduce根据任务自动生成的,我们可以通过自定义OutputFormat来自定义输出文件的名称。

wkiom1zi7yedcmo2aahhvkujuto984.jpg


自定义的OutputFormat代码如下,这里和之前的MultipleWorkCount的区别在于本示例中直接通过FSDataOutputStream来实现,而不是之前调用LineRecordWriter的方式。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.lucl.hadoop.mapreduce.part; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.HashMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /**  *   * @author luchunli  * @description 自定义OutputFormat  */ public class ProtocolOutputFormat extends TextOutputFormat<Text, Text> {     protected static class ProtocolRecordWriter extends RecordWriter<Text, Text> {         private static final String utf8 = "UTF-8";         private static final byte[] newline;         static {           try {             newline = "n".getBytes(utf8);           } catch (UnsupportedEncodingException uee) {             throw new IllegalArgumentException("can't find " + utf8 + " encoding");           }         }                  protected TaskAttemptContext context = null;                  protected HashMap<Text, DataOutputStream> recordStream = null;         protected Path workPath = null;                  public ProtocolRecordWriter () {}                  public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {             this.context = context;             this.workPath = workPath;             recordStream = new HashMap<Text, DataOutputStream>();         }         @Override         public void write(Text key, Text value) throws IOException, InterruptedException {               boolean nullKey = key == null;               boolean nullValue = value == null;               if (nullKey && nullValue) {                 return;               }               DataOutputStream out = recordStream.get(key);               if (null == out) {                   Path file = new Path(workPath, key + ".txt");                   out = file.getFileSystem(this.context.getConfiguration()).create(file, false);                   recordStream.put(key, out);               }               if (!nullKey) {                  out.write(key.getBytes(), 0, key.getLength());               }               if (!(nullKey || nullValue)) {                 out.write("t".getBytes());               }               if (!nullValue) {                  out.write(value.getBytes(), 0, value.getLength());               }               out.write(newline);         }         @Override         public void close(TaskAttemptContext context) throws IOException,                 InterruptedException {             for (DataOutputStream out : recordStream.values()) {                 out.close();             }             recordStream.clear();             recordStream = null;         }     }           @Override     public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)             throws IOException, InterruptedException {         Path workPath = this.getTaskOutputPath(context);         return new ProtocolRecordWriter(context, workPath);     }          private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {         Path workPath = null;         OutputCommitter committer = super.getOutputCommitter(context);                  if (committer instanceof FileOutputCommitter) {             // Get the directory that the task should write results into.             workPath = ((FileOutputCommitter) committer).getWorkPath();         } else {             // Get the {@link Path} to the output directory for the map-reduce job.             // context.getConfiguration().get(FileOutputFormat.OUTDIR);             Path outputPath = super.getOutputPath(context);             if (null == outputPath) {                 throw new IOException("Undefined job output-path.");             }             workPath = outputPath;         }                  return workPath;     } }


再次运行

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500020 15/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 1 15/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:1 15/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0010 15/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0010 15/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/ 15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_0010 15/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false 15/12/05 22:00:00 INFO mapreduce.Job:  map 0% reduce 0% 15/12/05 22:00:29 INFO mapreduce.Job:  map 100% reduce 0% 15/12/05 22:00:48 INFO mapreduce.Job:  map 100% reduce 33% 15/12/05 22:01:07 INFO mapreduce.Job:  map 100% reduce 100% 15/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully 15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50         File System Counters                 FILE: Number of bytes read=158                 FILE: Number of bytes written=432595                 FILE: Number of read operations=0                 FILE: Number of large read operations=0                 FILE: Number of write operations=0                 HDFS: Number of bytes read=532                 HDFS: Number of bytes written=130                 HDFS: Number of read operations=12                 HDFS: Number of large read operations=0                 HDFS: Number of write operations=6         Job Counters                  Killed reduce tasks=1                 Launched map tasks=1                 Launched reduce tasks=4                 Data-local map tasks=1                 Total time spent by all maps in occupied slots (ms)=26075                 Total time spent by all reduces in occupied slots (ms)=92427                 Total time spent by all map tasks (ms)=26075                 Total time spent by all reduce tasks (ms)=92427                 Total vcore-seconds taken by all map tasks=26075                 Total vcore-seconds taken by all reduce tasks=92427                 Total megabyte-seconds taken by all map tasks=26700800                 Total megabyte-seconds taken by all reduce tasks=94645248         Map-Reduce Framework                 Map input records=4                 Map output records=4                 Map output bytes=132                 Map output materialized bytes=158                 Input split bytes=109                 Combine input records=0                 Combine output records=0                 Reduce input groups=3                 Reduce shuffle bytes=158                 Reduce input records=4                 Reduce output records=3                 Spilled Records=8                 Shuffled Maps =3                 Failed Shuffles=0                 Merged Map outputs=3                 GC time elapsed (ms)=339                 CPU time spent (ms)=4690                 Physical memory (bytes) snapshot=513667072                 Virtual memory (bytes) snapshot=3405312000                 Total committed heap usage (bytes)=167907328         Shuffle Errors                 BAD_ID=0                 CONNECTION=0                 IO_ERROR=0                 WRONG_LENGTH=0                 WRONG_MAP=0                 WRONG_REDUCE=0         File Input Format Counters                  Bytes Read=423         File Output Format Counters                  Bytes Written=130 [hadoop@nnode code]$


查看结果

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
[hadoop@nnode code]$ hdfs dfs -ls /2015120500020 Found 4 items -rw-r--r--   2 hadoop hadoop         33 2015-12-05 22:01 /2015120500020/FTP.txt -rw-r--r--   2 hadoop hadoop         62 2015-12-05 22:00 /2015120500020/HTTP.txt -rw-r--r--   2 hadoop hadoop         35 2015-12-05 22:01 /2015120500020/HTTPS.txt -rw-r--r--   2 hadoop hadoop          0 2015-12-05 22:01 /2015120500020/_SUCCESS [hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txt FTP     www.subnetc1.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txt HTTP    www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txt HTTPS   www.subnetc3.com/index.html; [hadoop@nnode code]$


转载于:https://blog.51cto.com/luchunli/1719916

最后

以上就是幸福蚂蚁最近收集整理的关于Hadoop2.6.0学习笔记(七)MapReduce分区的全部内容,更多相关Hadoop2内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部