概述
1. 进入Job提交方法
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
// 判断Job的状态,如果Runing,代表Job正在运行,不会重复提交
if (state == JobState.DEFINE) {
submit();
}
// 执行完后,打印执行的信息
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
1.1 提交Job到Cluster
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 创建Cluster对象,包含两个关键属性: ①文件系统,负责读入数据到程序,写出数据,保存结果 ②运行Job的客户端,如果Job运行方式是Local,使用LocalJobRunner,如果Job运行方式是YARN,使用YarnRunner
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
1.2 创建Cluster
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
//根据用户的configuration,创建相应的Cluster,负责运行Job
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
1.3 使用Submitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 验证输出目录是否合法和存在
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
// 获取当前Job作业区域的路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 如果本地提交: 当前job的作业目录在eclipse所在的工作空间,所在盘符的/tmp
// 在YARN上提交,需要HDFS来找/tmp
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 切片操作,产生split文件和splitinfo,是对切片和对切片的说明信息
// split记录了 当前输入目录中,所有文件,切了几片,每一片都是一个FileSplit对象
// splitinto对所有片信息的说明,记录了每一片,应该到哪个节点去读取数据
int maps = writeSplits(job, submitJobDir);
// 设置mapreduce.job.maps 为切片数
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// 将Job所有的配置信息,写入到Job.xml中!
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
// 正式准备提交Job
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
1.4 提交Job
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
// 根据之前的准备工作,重构Job
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}
1.5 创建LocalJobRunner可以执行的Job对象
public Job(JobID jobid, String jobSubmitDir) throws IOException {
……
// 将之前已经生成的Job运行的各种设置,重新赋值给LocalJobRunner$Job
// 开启一个分线程来运行Job
this.start();
}
1.6 Job的run()
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { // 判断Job的状态,如果Runing,代表Job正在运行,不会重复提交 if (state == JobState.DEFINE) { submit(); } // 执行完后,打印执行的信息 if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
1.1 提交Job到Clusterpublic void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // 创建Cluster对象,包含两个关键属性: ①文件系统,负责读入数据到程序,写出数据,保存结果 ②运行Job的客户端,如果Job运行方式是Local,使用LocalJobRunner,如果Job运行方式是YARN,使用YarnRunner connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
1.2 创建Clusterprivate synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { //根据用户的configuration,创建相应的Cluster,负责运行Job if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
1.3 使用Submitter提交JobJobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
// 验证输出目录是否合法和存在 checkSpecs(job);
Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); // 获取当前Job作业区域的路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); // 如果本地提交: 当前job的作业目录在eclipse所在的工作空间,所在盘符的/tmp // 在YARN上提交,需要HDFS来找/tmp Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } if (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); }
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); // 切片操作,产生split文件和splitinfo,是对切片和对切片的说明信息 // split记录了 当前输入目录中,所有文件,切了几片,每一片都是一个FileSplit对象 // splitinto对所有片信息的说明,记录了每一片,应该到哪个节点去读取数据 int maps = writeSplits(job, submitJobDir); // 设置mapreduce.job.maps 为切片数 conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); }
// Set reservation info if it exists ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); }
// 将Job所有的配置信息,写入到Job.xml中! writeConf(conf, submitJobFile);
// // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); // 正式准备提交Job status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true);
} } }
1.4 提交Jobpublic org.apache.hadoop.mapreduce.JobStatus submitJob( org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, Credentials credentials) throws IOException { // 根据之前的准备工作,重构Job Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); job.job.setCredentials(credentials); return job.status;
}
1.5 创建LocalJobRunner可以执行的Job对象public Job(JobID jobid, String jobSubmitDir) throws IOException { …… // 将之前已经生成的Job运行的各种设置,重新赋值给LocalJobRunner$Job // 开启一个分线程来运行Job this.start(); }
1.6 Job的run()
2. 进入Map阶段2.1 进入MapTaskRunable的run()
2.2 MapTask的run()
2.3 运行Mapper
2.4 进入Mapper的run()
3. 进入Reduce阶段3.1 ReduceTask的run()方法
3.2 runNewReducer()
二、程序的默认组件1. 默认的OutPutFormat从Configuration对象中,读取mapreduce.job.outputformat.class对应的值,如果没用则使用默认的TextOutputFormat @SuppressWarnings("unchecked") public Class<? extends OutputFormat<?,?>> getOutputFormatClass() throws ClassNotFoundException { return (Class<? extends OutputFormat<?,?>>) conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class); }
2. 默认的输入格式InputFormat默认读取:mapreduce.job.inputformat.class,如果没用设置,使用TextInputFormat.class @SuppressWarnings("unchecked") public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException { return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); }
三、InputForamt切片3.1 默认的切片逻辑private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
3.2 默认的片大小如何计算默认情况片大小和块大小一致! 通过设置mapreduce.input.fileinputformat.split.maxsize,将片大小,调节小于块大小! 通过设置mapreduce.input.fileinputformat.split.minsize,将片大小,调节大于块大小! protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
3.3 总结①根据输入目录的所有文件,进行切片,以文件为单位! ②先根据文件名,判断是否可切,使用isSplitable()判断,常规的文件都可切,压缩文件,有可以切割的,也有不能切割的 ③如果文件不能切割,整个作为一片 ④如果文件可以切割,确定每片大小,默认片大小,参考文件系统的块大小! ⑤将文件,根据块大小进行切片,切后,最后一片有可能大于块大小,但是不会小于其1.1倍 |
2. 进入Map阶段
2.1 进入MapTaskRunable的run()
public void run() { try { // 生成当前Task任务的id TaskAttemptID mapId = new TaskAttemptID(new TaskID( jobId, TaskType.MAP, taskId), 0); LOG.info("Starting task: " + mapId); mapIds.add(mapId); MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); map.setUser(UserGroupInformation.getCurrentUser(). getShortUserName()); setupChildMapredLocalDirs(map, localConf); // 创建当前MapTask 输出的文件对象 MapOutputFile mapOutput = new MROutputFiles(); mapOutput.setConf(localConf); mapOutputFiles.put(mapId, mapOutput);
map.setJobFile(localJobFile.toString()); localConf.setUser(map.getUser()); map.localizeConfiguration(localConf); map.setConf(localConf); try { map_tasks.getAndIncrement(); myMetrics.launchMap(mapId); // 进入MapTask的run() map.run(localConf, Job.this); myMetrics.completeMap(mapId); } finally { map_tasks.getAndDecrement(); }
LOG.info("Finishing task: " + mapId); } catch (Throwable e) { this.storedException = e; } } } |
2.2 MapTask的run()
@Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; // 判断是否需要reduce阶段 // Map阶段,可以分为两个阶段: map: 调用Mapper的map()方法,对输入的key-value进行处理 // sort : 当map()处理完,context.wirte(),将key-value保存到文件中! // 在保存到文件之前,会将所有的key-value进行排序,会经过排序的阶段 if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; }
if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); } |
2.3 运行Mapper
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); }
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext);
try { input.initialize(split, mapperContext); mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } } |
2.4 进入Mapper的run()
public void run(Context context) throws IOException, InterruptedException { // setUp先执行 setup(context); try { // 对每一对输入的key-value,调用map进行处理 while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { // 最终执行cleanup() cleanup(context); } } |
3. 进入Reduce阶段
3.1 ReduceTask的run()方法
@SuppressWarnings("unchecked") public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); // ReduceTask 大致分为三个阶段 // copy: 将当前ReduceTask处理的分区文件,从所有的MapTask中拷贝过来 // sort: 将多个分区文件,合并为一个总的文件,保证合并后总的文件也是有序的 // merge and sort // reduce : 将合并后的文件,交给reducer处理
if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } // start thread that will handle communication with parent TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; }
// Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass(); CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile, localMapFiles); shuffleConsumerPlugin.init(shuffleContext);
rIter = shuffleConsumerPlugin.run();
// free up the data structures mapOutputFilesOnDisk.clear();
sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) { // 进入reducer处理 runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); }
shuffleConsumerPlugin.close(); done(umbilical, reporter); } |
3.2 runNewReducer()
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException,InterruptedException, ClassNotFoundException { // wrap value iterator to report progress. final RawKeyValueIterator rawIter = rIter; rIter = new RawKeyValueIterator() { public void close() throws IOException { rawIter.close(); } public DataInputBuffer getKey() throws IOException { return rawIter.getKey(); } public Progress getProgress() { return rawIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rawIter.getValue(); } public boolean next() throws IOException { boolean ret = rawIter.next(); reporter.setProgress(rawIter.getProgress().getProgress()); return ret; } }; // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a reducer org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext); job.setBoolean("mapred.skip.on", isSkipping()); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass); try { // 类同Mapper的run() reducer.run(reducerContext); } finally { trackedRW.close(reducerContext); } } |
二、程序的默认组件
1. 默认的OutPutFormat
从Configuration对象中,读取mapreduce.job.outputformat.class对应的值,如果没用则使用默认的TextOutputFormat
@SuppressWarnings("unchecked")
public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
throws ClassNotFoundException {
return (Class<? extends OutputFormat<?,?>>)
conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
}
2. 默认的输入格式InputFormat
默认读取:mapreduce.job.inputformat.class,如果没用设置,使用TextInputFormat.class
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
最后
以上就是忧郁小海豚为你收集整理的Hadoop之job提交流程源码简析1. 进入Job提交方法2. 进入Map阶段二、程序的默认组件三、InputForamt切片2. 进入Map阶段二、程序的默认组件的全部内容,希望文章能够帮你解决Hadoop之job提交流程源码简析1. 进入Job提交方法2. 进入Map阶段二、程序的默认组件三、InputForamt切片2. 进入Map阶段二、程序的默认组件所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复