概述
鲁春利的工作笔记,谁说程序员不能有文艺范?
MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区。默认情况下,MapReduce中使用的是HashPartitioner。
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
在HashPartitioner中getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。通过取key的hashCode,然后通过和Integer.MAX_VALUE与运算被转换为一个非负整数,任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0,也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。
示例:对于通过不同协议访问某些url数据进行统计(日志五元组)
原始数据
[hadoop@nnode code]$ hdfs dfs -text /http_interceptor_20130913.txt
2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 FTP www.subnetc1.com/index.html
2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html
2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTPS www.subnetc3.com/index.html
2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html
[hadoop@nnode code]$
实现Mapper
package com.lucl.hadoop.mapreduce.part;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author luchunli
* @description 实现Mapper
*
*/
public class ProtocolMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] values = value.toString().split("t");
if (null == values || values.length != 8) {
return;
}
Text newKey = new Text();
Text newValue = new Text();
newKey.set(values[6].trim());
newValue.set(values[7].trim());
context.write(newKey, newValue);
}
}
实现Reducer
package com.lucl.hadoop.mapreduce.part;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author luchunli
* @description 实现Reducer
*
*/
public class ProtocolReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer sbf = new StringBuffer();
for (Text text : values) {
sbf.append(text.toString());
sbf.append(";");
}
context.write(key, new Text(sbf.toString()));
}
}
实现Partitioner
package com.lucl.hadoop.mapreduce.part;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
*
* @author luchunli
* @description 自定义分区类
*
*/
public class ProtocolPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
if (key.toString().equals("FTP")) {
return 0;
}
if (key.toString().equals("HTTP")) {
return 1;
}
if (key.toString().equals("HTTPS")) {
return 2;
}
return 0;
}
}
实现驱动器类
package com.lucl.hadoop.mapreduce.part;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ProtocolDriver extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new ProtocolDriver(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
job.setJarByClass(ProtocolDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(ProtocolMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置task reduce的个数
job.setNumReduceTasks(3);
job.setPartitionerClass(ProtocolPartitioner.class);
job.setReducerClass(ProtocolReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setOutputFormatClass(ProtocolOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
调用执行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500018 15/12/05 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process : 1 15/12/05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:1 15/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0008 15/12/05 21:41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0008 15/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/ 15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_0008 15/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode : false 15/12/05 21:41:43 INFO mapreduce.Job: map 0% reduce 0% 15/12/05 21:42:12 INFO mapreduce.Job: map 100% reduce 0% 15/12/05 21:42:32 INFO mapreduce.Job: map 100% reduce 33% 15/12/05 21:42:52 INFO mapreduce.Job: map 100% reduce 100% 15/12/05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully 15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=431827 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26277 Total time spent by all reduces in occupied slots (ms)=105054 Total time spent by all map tasks (ms)=26277 Total time spent by all reduce tasks (ms)=105054 Total vcore-seconds taken by all map tasks=26277 Total vcore-seconds taken by all reduce tasks=105054 Total megabyte-seconds taken by all map tasks=26907648 Total megabyte-seconds taken by all reduce tasks=107575296 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=410 CPU time spent (ms)=4360 Physical memory (bytes) snapshot=515862528 Virtual memory (bytes) snapshot=3399213056 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130 [hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500018 Found 4 items -rw-r--r-- 2 hadoop hadoop 0 2015-12-05 21:42 /2015120500018/_SUCCESS -rw-r--r-- 2 hadoop hadoop 33 2015-12-05 21:42 /2015120500018/part-r-00000 -rw-r--r-- 2 hadoop hadoop 62 2015-12-05 21:42 /2015120500018/part-r-00001 -rw-r--r-- 2 hadoop hadoop 35 2015-12-05 21:42 /2015120500018/part-r-00002 [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00000 FTP www.subnetc1.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00001 HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500018/part-r-00002 HTTPS www.subnetc3.com/index.html; [hadoop@nnode code]$
上述生成的文件命名格式是MapReduce根据任务自动生成的,我们可以通过自定义OutputFormat来自定义输出文件的名称。
自定义的OutputFormat代码如下,这里和之前的MultipleWorkCount的区别在于本示例中直接通过FSDataOutputStream来实现,而不是之前调用LineRecordWriter的方式。
package com.lucl.hadoop.mapreduce.part;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
*
* @author luchunli
* @description 自定义OutputFormat
*/
public class ProtocolOutputFormat extends TextOutputFormat<Text, Text> {
protected static class ProtocolRecordWriter extends RecordWriter<Text, Text> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected TaskAttemptContext context = null;
protected HashMap<Text, DataOutputStream> recordStream = null;
protected Path workPath = null;
public ProtocolRecordWriter () {}
public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {
this.context = context;
this.workPath = workPath;
recordStream = new HashMap<Text, DataOutputStream>();
}
@Override
public void write(Text key, Text value) throws IOException, InterruptedException {
boolean nullKey = key == null;
boolean nullValue = value == null;
if (nullKey && nullValue) {
return;
}
DataOutputStream out = recordStream.get(key);
if (null == out) {
Path file = new Path(workPath, key + ".txt");
out = file.getFileSystem(this.context.getConfiguration()).create(file, false);
recordStream.put(key, out);
}
if (!nullKey) {
out.write(key.getBytes(), 0, key.getLength());
}
if (!(nullKey || nullValue)) {
out.write("t".getBytes());
}
if (!nullValue) {
out.write(value.getBytes(), 0, value.getLength());
}
out.write(newline);
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
for (DataOutputStream out : recordStream.values()) {
out.close();
}
recordStream.clear();
recordStream = null;
}
}
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Path workPath = this.getTaskOutputPath(context);
return new ProtocolRecordWriter(context, workPath);
}
private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(context);
if (committer instanceof FileOutputCommitter) {
// Get the directory that the task should write results into.
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
// Get the {@link Path} to the output directory for the map-reduce job.
// context.getConfiguration().get(FileOutputFormat.OUTDIR);
Path outputPath = super.getOutputPath(context);
if (null == outputPath) {
throw new IOException("Undefined job output-path.");
}
workPath = outputPath;
}
return workPath;
}
}
再次运行
[hadoop@nnode code]$ hadoop jar PartMR.jar /http_interceptor_20130913.txt /2015120500020 15/12/05 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process : 1 15/12/05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:1 15/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0010 15/12/05 21:59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0010 15/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/ 15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_0010 15/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode : false 15/12/05 22:00:00 INFO mapreduce.Job: map 0% reduce 0% 15/12/05 22:00:29 INFO mapreduce.Job: map 100% reduce 0% 15/12/05 22:00:48 INFO mapreduce.Job: map 100% reduce 33% 15/12/05 22:01:07 INFO mapreduce.Job: map 100% reduce 100% 15/12/05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully 15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=432595 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=26075 Total time spent by all reduces in occupied slots (ms)=92427 Total time spent by all map tasks (ms)=26075 Total time spent by all reduce tasks (ms)=92427 Total vcore-seconds taken by all map tasks=26075 Total vcore-seconds taken by all reduce tasks=92427 Total megabyte-seconds taken by all map tasks=26700800 Total megabyte-seconds taken by all reduce tasks=94645248 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=339 CPU time spent (ms)=4690 Physical memory (bytes) snapshot=513667072 Virtual memory (bytes) snapshot=3405312000 Total committed heap usage (bytes)=167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130 [hadoop@nnode code]$
查看结果
[hadoop@nnode code]$ hdfs dfs -ls /2015120500020 Found 4 items -rw-r--r-- 2 hadoop hadoop 33 2015-12-05 22:01 /2015120500020/FTP.txt -rw-r--r-- 2 hadoop hadoop 62 2015-12-05 22:00 /2015120500020/HTTP.txt -rw-r--r-- 2 hadoop hadoop 35 2015-12-05 22:01 /2015120500020/HTTPS.txt -rw-r--r-- 2 hadoop hadoop 0 2015-12-05 22:01 /2015120500020/_SUCCESS [hadoop@nnode code]$ hdfs dfs -text /2015120500020/FTP.txt FTP www.subnetc1.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTP.txt HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code]$ hdfs dfs -text /2015120500020/HTTPS.txt HTTPS www.subnetc3.com/index.html; [hadoop@nnode code]$
转载于:https://blog.51cto.com/luchunli/1719916
最后
以上就是幸福蚂蚁为你收集整理的Hadoop2.6.0学习笔记(七)MapReduce分区的全部内容,希望文章能够帮你解决Hadoop2.6.0学习笔记(七)MapReduce分区所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复