我是靠谱客的博主 听话马里奥,最近开发中收集的这篇文章主要介绍MR-2.输入格式(InputFormat)CombineFileInputFormat源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Hadoop处理少量的大文件比处理大量的小文件更好,主要因为FileInputFormat对单个文件会至少生成一个InputSplit。若文件比HDFS 的Block小的话,将产生多个InputSplit,让多个MapTask任务处理。

 

解决方案,通过CombineFileInputFormat将多个小文件封装,形成一个大InputSplit,然后maptask处理封装后的InputSplit。

 

当然,如果有可能,建议不要存储小文件到hdfs上,若是这样的话,还会占用大量namenode内存。一个可以减少大量小文件的方法使用SequenceFile将大量的小文件合并成一个或者多个大文件,文件名为k,内容为v。但是如果HDFS中已经存储大量的小文件,最好使用CombineFileInputFormat。

注意:

(1)多存储HDFS前,产生多个小文件,可以通过SequenceFile封装,然后存储HDFS

采用SequenceFile示例如下:

定义InputFormat

/**
 * 用户自定义FileInputFormat,对Inputsplit不可切分,并且仅生成一个record的k/v键值对
 * 
 * @author shenfl
 *
 */
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

	/**
	 * InputSplit split 仅生成一个record的k/v键值对
	 */
	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException, InterruptedException {
		 WholeRecordReader reader = new WholeRecordReader();
		 reader.initialize(split, context);
		 return reader;
	}

	/**
	 * 文件filename不可切分
	 */
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}
}

定义Job

/**
 * 存储HDFS之前把大量大文件合并后在存储HDFS
 * input/smallfiles input/smallfiles_out
 * @author shenfl
 *
 */
public class SmallFilesToSequenceFileConverter extends Configured implements Tool {

	static class SequenceFileMapper extends
			Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

		Text filenameKey = new Text();

		/**
		 * 由于使用WholeFileInputFormat,map任务仅1个,这个setup整个集群仅被调用一次
		 */
		@Override
		protected void setup(Context context) throws IOException, InterruptedException {

			InputSplit inputSplit = context.getInputSplit();
			Path path = ((FileSplit) inputSplit).getPath();
			filenameKey = new Text(path.getName());
		}
		
		@Override
		protected void map(NullWritable key, BytesWritable value, Context context)
				throws IOException, InterruptedException {
			context.write(filenameKey, value);
		}
	}

	public int run(String[] args) throws Exception {

		Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (job == null) {
			return -1;
		}
		
		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);

		job.setMapperClass(SequenceFileMapper.class);
		job.setReducerClass(Reducer.class);//reducer使用默认的类

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);

		return job.waitForCompletion(true)?0:1;
	}

	public static void main(String[] args) {

		int exitCode;
		try {
			exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);
			System.exit(exitCode);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

执行命令

hadoop jar hadoop-example.jar mapreduce.ncdc.sequence.SmallFilesToSequenceFileConverter -conf conf/hadoop-localhost.xml -D mapreduce.job.reduces=2 input/smallfiles output1

执行结果

[hadoop@mycluster ~]$ hdfs dfs -text output1/part-r-00000
file1.txt       68 65 6c 6c 6f 31 09 77 6f 72 6c 64 31 09 63 68 69 6e 61 31 0a
file3.txt       68 65 6c 6c 6f 33 33 09 77 6f 72 6c 64 33 33 09 63 68 69 6e 61 33 33 0a
[hadoop@mycluster ~]$ hdfs dfs -text output1/part-r-00001
file2.txt       68 65 6c 6c 6f 32 09 77 6f 72 6c 64 32 09 63 68 69 6e 61 32 0a

(2)          存档HDFS后,对HDFS上文件进行MapReduce工作统计,采用CombineFileInputFormat


最后

以上就是听话马里奥为你收集整理的MR-2.输入格式(InputFormat)CombineFileInputFormat源码分析的全部内容,希望文章能够帮你解决MR-2.输入格式(InputFormat)CombineFileInputFormat源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部