我是靠谱客的博主 听话绿草,最近开发中收集的这篇文章主要介绍Hadoop作业调度,觉得挺不错的,现在分享给大家,希望可以做个参考。






任务选择发生在调度器选定一个作业之后,目的是从该作业中选择一个最合适的任务。在Hadoop中,选择Map Task时需考虑的最重要的因素是数据本地性,也就是尽量将任务调度到数据所在节点。除了数据本地性之外,还需考虑失败任务、备份任务的调度顺序等。然而,由于Reduce Task没有数据本地性可言,因此选择Reduce Task时通常只需考虑未运行任务和备份任务的调度顺序。
假设某一时刻,TaskTracker X出现空闲的计算资源,向JobTracker汇报心跳请求新的任务,调度器根据一定的调度策略为之选择了任务Y

(2)Map Task选择策略
用户追踪作业运行状态的org.apache.hadoop.mapred.JobInProgress.java对象为Map Task维护了五个数据结构:

 // NetworkTopology Node to the set of TIPs
Map<Node, List<TaskInProgress>> nonRunningMapCache;//Node与未运行的TIP集合映射关系,通过作业的InputFormat可直接获取的
// Map of NetworkTopology Node to set of running TIPs
Map<Node, Set<TaskInProgress>> runningMapCache;//Node与运行的TIP集合映射关系,一个任务获得调度机会,其TIP便会添加进来
// A list of non-local, non-running maps
final List<TaskInProgress> nonLocalMaps;//non-local且未运行的TIP集合,non-local是指任务没有输入数据(InputSplit为空),//这可能是一些计算密集型任务,比如Hadoop example中的PI作业
// Set of failed, non-running maps sorted by #failures
final SortedSet<TaskInProgress> failedMaps;//按照Task Attempt失败次数排序的TIP集合
// A set of non-local running maps
Set<TaskInProgress> nonLocalRunningMaps;//non-local且正在运行的TIP集合


public synchronized List<Task> assignTasks(TaskTracker taskTracker)
throws IOException {
// Check for JT safe-mode
if (taskTrackerManager.isInSafeMode()) {
LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
return null;
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
Collection<JobInProgress> jobQueue =
// Get map + reduce counts for the current tracker.
final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();
// Assigned tasks
List<Task> assignedTasks = new ArrayList<Task>();
// Compute (running + pending) map and reduce task numbers across pool
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
if (job.scheduleReduces()) {
remainingReduceLoad +=
(job.desiredReduces() - job.finishedReduces());
// Compute the 'load factor' for maps and reduces
double mapLoadFactor = 0.0;
if (clusterMapCapacity > 0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
// In the below steps, we allocate first map tasks (if appropriate),
// and then reduce tasks if appropriate.
We go through all jobs
// in order of job arrival; jobs only get serviced if their 
// predecessors are serviced, too.
// We assign tasks to the current taskTracker if the given machine 
// has a workload that's less than the maximum load of that kind of
// task.
// However, if the cluster is close to getting loaded i.e. we don't
// have enough _padding_ for speculative executions etc., we only 
// schedule the "highest priority" task i.e. the task from the job 
// with the highest priority.
final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
boolean exceededMapPadding = false;
if (availableMapSlots > 0) {
exceededMapPadding =
exceededPadding(true, clusterStatus, trackerMapCapacity);
int numLocalMaps = 0;
int numNonLocalMaps = 0;
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
Task t = null;
// Try to schedule a Map task with locality between node-local 
// and rack-local
t =
numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
// Don't assign map tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededMapPadding) {
break scheduleMaps;
// Try all jobs again for the next Map task 
// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
if (t != null) {
// We assign at most 1 off-switch or speculative task
// This is to prevent TaskTrackers from stealing local-tasks
// from other TaskTrackers.
break scheduleMaps;
int assignedMaps = assignedTasks.size();
// Same thing, but for reduce tasks
// However we _never_ assign more than 1 reduce task per heartbeat
final int trackerCurrentReduceCapacity =
Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
final int availableReduceSlots =
Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
exceededReducePadding = exceededPadding(false, clusterStatus,
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
Task t =
job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
if (t != null) {
// Don't assign reduce tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededReducePadding) {
if (LOG.isDebugEnabled()) {
LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
"[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
(trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
trackerCurrentReduceCapacity + "," + trackerRunningReduces +
"] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
", " + (assignedTasks.size()-assignedMaps) + "]");
return assignedTasks;


List<Task> assignTasks(TaskTracker taskTracker):
List<Task> taskList ;
while taskTracker.askForTasks() : //不断分配新的任务
Queue queue = selectAQueueFromCluster(); //从系统中选择一个队列
JobInProgress job = selectAJobFromQueue(queue) ; //从队列中选择一个作业
Task tak = job.obtainNewTask(job) ; //从作业中选择一个任务
taskList.add(task) ;
taskTracker.addNewTask(task) ;
return taskList ;


// Create/manage tasks
* Return a MapTask, if appropriate, to run on the given tasktracker
public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts
) throws IOException {
return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts,

调用 org.apache.hadoop.mapred.JobInProgress.java中的obtainNewMapTaskCommon

* Return a MapTask with locality level that smaller or equal than a given
* locality level to tasktracker.
* @param tts The task tracker that is asking for a task
* @param clusterSize The number of task trackers in the cluster
* @param numUniqueHosts The number of hosts that run task trackers
* @param avgProgress The average progress of this kind of task in this job
* @param maxCacheLevel The maximum topology level until which to schedule
* @return the index in tasks of the selected task (or -1 for no task)
* @throws IOException
public synchronized Task obtainNewMapTaskCommon(
TaskTrackerStatus tts, int clusterSize, int numUniqueHosts,
int maxCacheLevel) throws IOException {
if (!tasksInited) {
LOG.info("Cannot create task split for " + profile.getJobID());
try { throw new IOException("state = " + status.getRunState()); }
catch (IOException ioe) {ioe.printStackTrace();}
return null;
int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel,
if (target == -1) {
return null;
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
// DO NOT reset for off-switch!
if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
return result;


* Find new map task
* @param tts The task tracker that is asking for a task
* @param clusterSize The number of task trackers in the cluster
* @param numUniqueHosts The number of hosts that run task trackers
* @param avgProgress The average progress of this kind of task in this job
* @param maxCacheLevel The maximum topology level until which to schedule
A value of {@link #anyCacheLevel} implies any
available task (node-local, rack-local, off-switch and
speculative tasks).
A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
off-switch/speculative tasks should be scheduled.
* @return the index in tasks of the selected task (or -1 for no task)
private synchronized int findNewMapTask(final TaskTrackerStatus tts,
final int clusterSize,
final int numUniqueHosts,
final int maxCacheLevel,
final double avgProgress) {
if (numMapTasks == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("No maps to schedule for " + profile.getJobID());
return -1;
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
// Update the last-known clusterSize
this.clusterSize = clusterSize;
if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
// Check to ensure this TaskTracker has enough resources to 
// run tasks from this job
long outSize = resourceEstimator.getEstimatedMapOutputSize();
long availSpace = tts.getResourceStatus().getAvailableSpace();
if(availSpace < outSize) {
LOG.warn("No room for map task. Node " + tts.getHost() +
" has " + availSpace +
" bytes free; but we expect map to take " + outSize);
return -1; //see if a different TIP might work better. 
// When scheduling a map task:
0) Schedule a failed task without considering locality
1) Schedule non-running tasks
2) Schedule speculative tasks
3) Schedule tasks with no location information
// First a look up is done on the non-running cache and on a miss, a look 
// up is done on the running cache. The order for lookup within the cache:
1. from local node to root [bottom up]
2. breadth wise for all the parent nodes at max level
// We fall to linear scan of the list ((3) above) if we have misses in the 
// above caches
// 0) Schedule the task with the most failures, unless failure was on this
tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
LOG.info("Choosing a failed task " + tip.getTIPId());
return tip.getIdWithinJob();
Node node = jobtracker.getNode(tts.getHost());
// 1) Non-running TIP :
// 1. check from local node to the root [bottom up cache lookup]
i.e if the cache is available and the host has been resolved
if (node != null) {
Node key = node;
int level = 0;
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
// called to schedule any task (local, rack-local, off-switch or
// speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
// findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
// off-switch/speculative tasks
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
for (level = 0;level < maxLevelToSchedule; ++level) {
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
if (cacheForLevel != null) {
tip = findTaskFromList(cacheForLevel, tts,
numUniqueHosts,level == 0);
if (tip != null) {
// Add to running cache
// remove the cache if its empty
if (cacheForLevel.size() == 0) {
return tip.getIdWithinJob();
key = key.getParent();
// Check if we need to only schedule a local task (node-local/rack-local)
if (level == maxCacheLevel) {
return -1;
//2. Search breadth-wise across parents at max level for non-running 
TIP if
- cache exists and there is a cache miss 
- node information for the tracker is missing (tracker's topology
info not obtained yet)
// collection of node at max level in the cache structure
Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
// get the node parent at max level
Node nodeParentAtMaxLevel =
(node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
for (Node parent : nodesAtMaxLevel) {
// skip the parent that has already been scanned
if (parent == nodeParentAtMaxLevel) {
List<TaskInProgress> cache = nonRunningMapCache.get(parent);
if (cache != null) {
tip = findTaskFromList(cache, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running cache
// remove the cache if empty
if (cache.size() == 0) {
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();
// 3. Search non-local tips for a new task
tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();
// 2) Running TIP :
if (hasSpeculativeMaps) {
long currentTime = jobtracker.getClock().getTime();
// 1. Check bottom up for speculative tasks from the running cache
if (node != null) {
Node key = node;
for (int level = 0; level < maxLevel; ++level) {
Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
if (cacheForLevel != null) {
tip = findSpeculativeTask(cacheForLevel, tts,
avgProgress, currentTime, level == 0);
if (tip != null) {
if (cacheForLevel.size() == 0) {
return tip.getIdWithinJob();
key = key.getParent();
// 2. Check breadth-wise for speculative tasks
for (Node parent : nodesAtMaxLevel) {
// ignore the parent which is already scanned
if (parent == nodeParentAtMaxLevel) {
Set<TaskInProgress> cache = runningMapCache.get(parent);
if (cache != null) {
tip = findSpeculativeTask(cache, tts, avgProgress,
currentTime, false);
if (tip != null) {
// remove empty cache entries
if (cache.size() == 0) {
LOG.info("Choosing a non-local task " + tip.getTIPId()
+ " for speculation");
return tip.getIdWithinJob();
// 3. Check non-local tips for speculation
tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress,
currentTime, false);
if (tip != null) {
LOG.info("Choosing a non-local task " + tip.getTIPId()
+ " for speculation");
return tip.getIdWithinJob();
return -1;


// Check to ensure this TaskTracker has enough resources to 
// run tasks from this job
long outSize = resourceEstimator.getEstimatedMapOutputSize();
long availSpace = tts.getResourceStatus().getAvailableSpace();
if(availSpace < outSize) {
LOG.warn("No room for map task. Node " + tts.getHost() +
" has " + availSpace +
" bytes free; but we expect map to take " + outSize);
return -1; //see if a different TIP might work better. 

2)从failedMaps列表中选择任务。failedMaps保存了按照Task Attempt失败次数排序的TIP集合。失败次数越多的任务,被调度的机会越大。需要注意的是,为了让失败的任务快速得到重新运行的机会,在进行任务选择时不再考虑数据本地性。

// When scheduling a map task:
0) Schedule a failed task without considering locality
1) Schedule non-running tasks
2) Schedule speculative tasks
3) Schedule tasks with no location information
// First a look up is done on the non-running cache and on a miss, a look
// up is done on the running cache. The order for lookup within the cache:
1. from local node to root [bottom up]
2. breadth wise for all the parent nodes at max level
// We fall to linear scan of the list ((3) above) if we have misses in the
// above caches
// 0) Schedule the task with the most failures, unless failure was on this
tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
LOG.info("Choosing a failed task " + tip.getTIPId());
return tip.getIdWithinJob();


 Node node = jobtracker.getNode(tts.getHost());
// 1) Non-running TIP :
// 1. check from local node to the root [bottom up cache lookup]
i.e if the cache is available and the host has been resolved
if (node != null) {
Node key = node;
int level = 0;
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
// called to schedule any task (local, rack-local, off-switch or
// speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
// findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
// off-switch/speculative tasks
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
for (level = 0;level < maxLevelToSchedule; ++level) {
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
if (cacheForLevel != null) {
tip = findTaskFromList(cacheForLevel, tts,
numUniqueHosts,level == 0);
if (tip != null) {
// Add to running cache
// remove the cache if its empty
if (cacheForLevel.size() == 0) {
return tip.getIdWithinJob();
key = key.getParent();
// Check if we need to only schedule a local task (node-local/rack-local)
if (level == maxCacheLevel) {
return -1;
//2. Search breadth-wise across parents at max level for non-running 
TIP if
- cache exists and there is a cache miss 
- node information for the tracker is missing (tracker's topology
info not obtained yet)
// collection of node at max level in the cache structure
Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
// get the node parent at max level
Node nodeParentAtMaxLevel =
(node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
for (Node parent : nodesAtMaxLevel) {
// skip the parent that has already been scanned
if (parent == nodeParentAtMaxLevel) {
List<TaskInProgress> cache = nonRunningMapCache.get(parent);
if (cache != null) {
tip = findTaskFromList(cache, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running cache
// remove the cache if empty
if (cache.size() == 0) {
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();


// 3. Search non-local tips for a new task
tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();


// 2) Running TIP :
if (hasSpeculativeMaps) {
long currentTime = jobtracker.getClock().getTime();
// 1. Check bottom up for speculative tasks from the running cache
if (node != null) {
Node key = node;
for (int level = 0; level < maxLevel; ++level) {
Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
if (cacheForLevel != null) {
tip = findSpeculativeTask(cacheForLevel, tts,
avgProgress, currentTime, level == 0);
if (tip != null) {
if (cacheForLevel.size() == 0) {
return tip.getIdWithinJob();
key = key.getParent();
// 2. Check breadth-wise for speculative tasks
for (Node parent : nodesAtMaxLevel) {
// ignore the parent which is already scanned
if (parent == nodeParentAtMaxLevel) {
Set<TaskInProgress> cache = runningMapCache.get(parent);
if (cache != null) {
tip = findSpeculativeTask(cache, tts, avgProgress,
currentTime, false);
if (tip != null) {
// remove empty cache entries
if (cache.size() == 0) {
LOG.info("Choosing a non-local task " + tip.getTIPId()
+ " for speculation");
return tip.getIdWithinJob();


(3)Reduce Task选择策略
由于Reduce Task不存在数据本地性,因此,与Map Task相比,它的调度策略显得非常简单。JobInProgress对象为其保存了两个数据结构:nonRunningReduces和runningReduces,分别表示尚未运行的TIP列表和正在运行的TIP列表。

* Return a ReduceTask, if appropriate, to run on the given tasktracker.
* We don't have cache-sensitivity for reduce tasks, as they
work on temporary MapRed files.
public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts
) throws IOException {
if (status.getRunState() != JobStatus.RUNNING) {
LOG.info("Cannot create task split for " + profile.getJobID());
return null;
/** check to see if we have any misbehaving reducers. If the expected output
* for reducers is huge then we just fail the job and error out. The estimated
* size is divided by 2 since the resource estimator returns the amount of disk
* space the that the reduce will use (which is 2 times the input, space for merge + reduce
* input). **/
long estimatedReduceInputSize = resourceEstimator.getEstimatedReduceInputSize()/2;
if (((estimatedReduceInputSize) >
reduce_input_limit) && (reduce_input_limit > 0L)) {
// make sure jobtracker lock is held
LOG.info("Exceeded limit for reduce input size: Estimated:" +
estimatedReduceInputSize + " Limit: " +
reduce_input_limit + " Failing Job " + jobId);
status.setFailureInfo("Job exceeded Reduce Input limit "
+ " Limit:
" + reduce_input_limit +
" Estimated: " + estimatedReduceInputSize);
return null;
// Ensure we have sufficient map outputs ready to shuffle before 
// scheduling reduces
if (!scheduleReduces()) {
return null;
target = findNewReduceTask(tts, clusterSize, numUniqueHosts,
if (target == -1) {
return null;
Task result = reduces[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
return result;


* Find new reduce task
* @param tts The task tracker that is asking for a task
* @param clusterSize The number of task trackers in the cluster
* @param numUniqueHosts The number of hosts that run task trackers
* @param avgProgress The average progress of this kind of task in this job
* @return the index in tasks of the selected task (or -1 for no task)
private synchronized int findNewReduceTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts,
double avgProgress) {
if (numReduceTasks == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("No reduces to schedule for " + profile.getJobID());
return -1;
String taskTracker = tts.getTrackerName();
TaskInProgress tip = null;
// Update the last-known clusterSize
this.clusterSize = clusterSize;
if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
// 1. check for a never-executed reduce tip
// reducers don't have a cache and so pass -1 to explicitly call that out
tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
if (tip != null) {
return tip.getIdWithinJob();
// 2. check for a reduce tip to be speculated
if (hasSpeculativeReduces) {
tip = findSpeculativeTask(runningReduces, tts, avgProgress,
jobtracker.getClock().getTime(), false);
if (tip != null) {
return tip.getIdWithinJob();
return -1;

1)合法性检查。同Map Task一样,对节点可靠性和磁盘空间进行检查。

if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;


 // 1. check for a never-executed reduce tip
// reducers don't have a cache and so pass -1 to explicitly call that out
tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
if (tip != null) {
return tip.getIdWithinJob();


// 2. check for a reduce tip to be speculated
if (hasSpeculativeReduces) {
tip = findSpeculativeTask(runningReduces, tts, avgProgress,
jobtracker.getClock().getTime(), false);
if (tip != null) {
return tip.getIdWithinJob();





评论列表共有 0 条评论
