概述
一、前述
剖析hadoop源码,首先从Client看起,因为Client在MapReduce的过程中承担了很多重要的角色。
二、MapReduce框架主类
代码如下:
public static void main(String[] args) throws Exception {
// System.setProperty("hadoop.home.dir", "E:/Hadoop/hadoop-2.6.5");
Configuration conf = new Configuration();
// Job job = new Job(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat.addInputPath(job, new Path("hdfs://node03:9000/lxk/haha.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://node03:9000/lxk01/haha.txt"));
// 向yarn集群提交这个job
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
第一步,先分析Job,可以看见源码中Job实现了public class Job extends JobContextImpl implements JobContext
然后JobContext实现了 MRJobConfig,可以看见其中有很多配置
因为job中传的参数为conf,所以这里的配置即对应我们的配置文件中的属性值。
Job job = Job.getInstance(conf); 挑几个重要的看下:
// 默认的Mapper任务内存大小。
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
// 默认的Mapper任务计算使用CPU资源核数
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
public static final int DEFAULT_MAP_CPU_VCORES = 1;
// 默认的reduce任务内存大小。
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
// 默认的reduce任务计算使用CPU资源核数
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
第二步,分析提交过程 job.waitForCompletion(true); 追踪源码发现主要实现这个类
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException
- Checking the input and output specifications of the job. //检查输入输出路径
- Computing the
InputSplit
s for the job. //检查切片 - Setup the requisite accounting information for the
DistributedCache
of the job, if necessary. - Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
- Submitting the job to the
JobTracker
and optionally monitoring it's status.
在此方法中,重点看下此方法 int maps = writeSplits(job, submitJobDir);
追踪后具体实现可知
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
追踪job.getInputFormatClass()可以发现如下代码: 所以可得知用户的默认输入类是TextInputformat类并且继承关系如下: TextInputforMat-->FileinputFormat-->InputFormat
追踪 List<InputSplit> splits = input.getSplits(job);可以得到如下源码:
最为重要的一个源码!!!!!!!!!!!
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));如果用户设置则取用户,没有是1
long maxSize = getMaxSplitSize(job);//如果用户设置则取用户,没有取最大值
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();//取输入文件的大小和路径
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);//获得所有块的位置。
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);//获得切片大小
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//这一块传参传的是切块的偏移量,返回这个块的索引
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),//根据当前块的索引号取出来块的位置包括副本的位置 然后传递给切片,然后切片知道往哪运算。即往块的位置信息计算
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
1.long splitSize = computeSplitSize(blockSize, minSize, maxSize);追踪源码发现
1 2 3 |
|
切片大小默认是块的大小!!!!
假如让切片大小 < 块的大小则更改配置的最大值MaxSize,让其小于blocksize
假如让切片大小 > 块的大小则更改配置的最小值MinSize,让其大于blocksize
通过FileInputFormat.setMinInputSplitSize即可。
2. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining) 追踪源码发现
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
//切片要大于>=块的起始量,小于一个块的末尾量。
return i;//返回这个块
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
3. splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()
创建切片的时候,一个切片对应一个mapperr任务,所以创建切片的四个位置(path,0,10,host)
根据host可知mapper任务的计算位置,则对应计算向数据移动!!!!块是逻辑的,并没有真正切割数据。!!
4.上述getSplits方法最终得到一个切片的清单,清单的数目就是mapper的数量!!即开始方法的入口 int maps = writeSplits(job, submitJobDir);返回值。
5.计算向数据移动时会拉取只属于自己的文件。
参考:https://www.cnblogs.com/LHWorldBlog/p/8244881.html
最后
以上就是笨笨可乐为你收集整理的Hadoop源码篇之客户端Job的全部内容,希望文章能够帮你解决Hadoop源码篇之客户端Job所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复