概述
task任务执行位置优先级策略
- 1.概述
- 2.RDD创建
- 2.job划分stage
- 3.stage划分task
- 3.1.RDD分区位置信息集合获取
- 3.1.1.位置信息封装类分析
- 3.1.1.1.TaskLocation
- 3.1.1.2.ExecutorCacheTaskLocation
- 3.1.1.3.HostTaskLocation
- 3.1.1.4.HDFSCacheTaskLocation
- 3.1.2.getCacheLocs-获取数据在executor内存中的位置信息
- 3.1.3.preferredLocations-获取RDD分区位置信息字符串集合
- 3.1.3.1.常见RDD子类获取位置信息函数的实现
- 3.1.3.1.1.HadoopRDD
- 3.1.3.1.2.MapPartitionsRDD
- 3.1.3.1.3.ShuffledRDD
- 3.1.3.1.4.JdbcRDD
- 4.task执行
- 4.1.创建taskSetManager管理taskSet
- 4.1.1.TaskSetManager实例化
- 4.1.1.1.任务task执行的5中级别
- 4.1.1.2.taskSet中任务添加到对应待处理任务集中
- 4.1.1.3.计算taskSet中使用的位置级别
- 4.1.2.getAllowedLocalityLevel-获取跟据延迟策略可以启动任务的级别
- 4.1.3.1.每个位置级别允许的间隔时间
- 4.1.3.resourceOffer获取executor上需要执行的任务集合
- 4.1.3.1.dequeueTask-获取节点待处理任务
- 4.2.taskSheduler申请资源
- 4.2.1.入口
- 4.2.2.TaskSchedulerImpl#resourceOffers申请资源
- 4.2.2.1.TaskSchedulerImpl#resourceOfferSingleTaskSet
- 5.总结
- 5.1.流程总结
- 5.2.位置优先级策略总结
- 6.参考资料
1.概述
从RDD的创建到driver申请资源分发任务,以task任务执行时的位置优先选择为主线进行分析;spark版本2.11
2.RDD创建
下面这段话引自弹性分布式数据集 (RDDs):
Spark 围绕弹性分布式数据集(RDD) 的概念展开,RDD 是可以并行操作的元素的容错集合。创建 RDD 有两种方法:并行化 驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。
针对RDD创建方式的详情请见【Spark源码-spark算子-1-构建RDD的算子】
2.job划分stage
在DAGScheduler#getMissingParentStages()中定义了job任务划分为stage依赖链的逻辑:从最后一个stage的最后一个rdd开始根据rdd依赖向上追溯,遇到宽依赖就划分stage;
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
//初始化等待划分stage的rdd堆栈数组
val waitingForVisit = new ArrayStack[RDD[_]]
//rdd链的stage划分
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {//没有参与划分的rdd
//标识已经参与划分的rdd
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
//遍历当前rdd的所有依赖关系
for (dep <- rdd.dependencies) {
dep match {
//宽依赖:构建ShuffleMapStage的实例化对象mapStage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//窄依赖:将当前rdd的依赖rdd放入堆栈数组heap位置
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
//将stage的最后一个rdd放入堆栈数组
waitingForVisit.push(stage.rdd)
//从后向前便利stage中所有rdd
while (waitingForVisit.nonEmpty) {
//rdd链的stage划分:从堆栈数组总取出heap元素===>实现rdd向上遍历能力
visit(waitingForVisit.pop())
}
missing.toList
}
}
3.stage划分task
在DAGScheduler#submitMissingTasks()中定义了stage任务根据分区数划分task集合的逻辑;
调用DAGScheduler#getPreferredLocs()方法获取stage中最后一个RDD每个分区的位置信息集合;
根据stage分区,每个分区构建一个task,每个task在构建上传入该分区位置信息集合;
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
//获取stage的分区数量
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// 其他代码。。。
//分区id ---> 位置信息集合的映射
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
//通过getPreferredLocs()方法获取分区位置信息集合;最后返回stage(分区id-->分区位置信息集合)映射的集合
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// 其他代码。。。
//将同一个stage中的按照分区封装成一个task集合(一个分区一个task)
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
//ShuffleMapStage构建shuffleTask
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
//task任务执行的位置列表
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
//在构建task时,传入task任务执行的位置列表
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
//ResultStage构建resultTask
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
//task任务执行的位置列表
val locs = taskIdToLocations(id)
//在构建task时,传入task任务执行的位置列表
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
//将task集合封装为taskSet,然后提交到taskSchedulee
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
//其他代码。。。
}
}
}
3.1.RDD分区位置信息集合获取
1、分区数据存在缓存中,使用缓存的位置信息集合;
2、分区数据没有存在缓存中,调用rdd的preferredLocations函数,获取rdd分区的位置字符串列表;然后根据位置信息列表,调用TaskLocation#apply(str)函数,封装为对应的TaskLocation实现类对象;最后返回位置信息(TaskLocation对象)集合;
3、针对窄依赖,没有位置优先级,向上追溯父RDD,直到遇到父RDD存在位置优先级(重写了preferredLocations函数),从该父RDD第一个分区开始轮询获取分区位置信息,如果第一个分区存在位置信息,使用第一个分区的位置信息,否则继续下一个分区位置信息获取,直到获取的位置信息并返回;
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
//new一个HashSet标识已经获取过位置信息的rdd分区
getPreferredLocsInternal(rdd, partition, new HashSet)
}
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// 已经获取过位置信息的rdd分区,不在重复获取
if (!visited.add((rdd, partition))) {
return Nil
}
// 从缓存中获取数据
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
//存在缓存,直接返回从缓存中获取的位置信息(TaskLocation对象)集合
return cached
}
// 没有缓存,调用rdd的preferredLocations函数,获取rdd分区的位置字符串列表
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
//根据位置信息列表,调用TaskLocation#apply(str)函数,封装为对应的TaskLocation实现类对象;最后返回位置信息(TaskLocation对象)集合
return rddPrefs.map(TaskLocation(_))
}
// 针对窄依赖,没有位置优先级,向上追溯父RDD,直到遇到父RDD存在位置优先级(重写了preferredLocations函数),从该父RDD第一个分区开始轮询获取分区位置信息,如果第一个分区存在位置信息,使用第一个分区的位置信息,否则继续下一个分区位置信息获取,直到获取的位置信息并返回;
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
}
3.1.1.位置信息封装类分析
3.1.1.1.TaskLocation
位置信息封装类顶级trait;
定义位置信息的必要信息host;
定义数据存储在hdfs、executor内存中时,位置信息字符串的前缀:存储在hdfs --> hdfs_cache_;存储在executor内存中 --> executor_;
提供根据位置信息字符串前缀创建TaskLocation的不同实现类对象的能力;
private[spark] sealed trait TaskLocation {
//必要信息host
def host: String
}
private[spark] object TaskLocation {
//用这个前缀来标识缓存hdfs块的主机位置:数据缓存在hdfs中
val inMemoryLocationTag = "hdfs_cache_"
//用这个前缀标识executor的位置:数据在executor的内存中
val executorLocationTag = "executor_"
//根据host和executorId构造一个ExecutorCacheTaskLocation实例化对象
def apply(host: String, executorId: String): TaskLocation = {
new ExecutorCacheTaskLocation(host, executorId)
}
//根据数据缓存的位置,创建TaskLocation的不同实现类对象
def apply(str: String): TaskLocation = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {//字符串不是以hdfs_cache_开头
if (str.startsWith(executorLocationTag)) {//字符串以executor_开头
val hostAndExecutorId = str.stripPrefix(executorLocationTag)
val splits = hostAndExecutorId.split("_", 2)
require(splits.length == 2, "Illegal executor location format: " + str)
val Array(host, executorId) = splits
new ExecutorCacheTaskLocation(host, executorId)
} else {//字符串既不是以hdfs_cache_开头,也不是以executor_开头
new HostTaskLocation(str)
}
} else {//字符串以hdfs_cache_开头
new HDFSCacheTaskLocation(hstr)
}
}
}
3.1.1.2.ExecutorCacheTaskLocation
标识数据缓存在executor的内存中;
定义位置信息字符串格式:executor_ h o s t _ host_ host_executorId
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation {
//executor_$host_$executorId
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
3.1.1.3.HostTaskLocation
标识数据在host这个节点的磁盘上;
定义位置信息字符串格式:$host
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = host
}
3.1.1.4.HDFSCacheTaskLocation
标识数据存储在hdfs上面;
定义位置信息字符串格式:hdfs_cache_$host
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = TaskLocation.inMemoryLocationTag + host
}
3.1.2.getCacheLocs-获取数据在executor内存中的位置信息
从executor内存中获取缓存位置信息;
从blockManager中获取block的位置信息,并封装为ExecutorCacheTaskLocation实例化对象;
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
//分区id --> 位置信息集合
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// 已经计算过位置信息的分区不再计算
if (!cacheLocs.contains(rdd.id)) {
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
// 如果存储级别为NONE,我们不需要从块管理器获取位置
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
//从blockManager获取block块位置信息,并封装为ExecutorCacheTaskLocation对象
blockManagerMaster.getLocations(blockIds).map { bms =>
//根据host和executorId构造一个ExecutorCacheTaskLocation实例化对象
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
//返回当前rdd所有分区在缓存中的位置信息集合
cacheLocs(rdd.id)
}
}
3.1.3.preferredLocations-获取RDD分区位置信息字符串集合
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
//获取分区位置信息字符串集合:不同子类RDD由不同实现
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
//判断是否使用了checkpoint检查点,如果有检查点直接从检查点获取位置偏好,因为数据在检查点上,如果是没有设置检查点,则直接调用getPreferredLocations函数获取位置集合
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
}
3.1.3.1.常见RDD子类获取位置信息函数的实现
1、hadoopRDD通过判断分区的对应分片各个副本的状态,将位置封装为HDFSCacheTaskLocation或HostTaskLocatio,然后返回位置字符串,最后将分区各个副本的位置字符串放在一个集合返回;
2、mapPartitonsRDD和jdbcRDD都没有重写获取位置优先级函数,使用其父类getPreferredLocations方法实现protected def getPreferredLocations(split: Partition): Seq[String] = Nil
;
3、shuffleRDD是通过输出追踪器获取位置信息;
3.1.3.1.1.HadoopRDD
sequenceFile、hadoopRDD等算子调用后返回HadoopRDD实例化对象;
针对hadoopRdd,获取分区对应分片各个副本的位置信息;
判断分片副本状态,处于内存中,封装为spark的HDFSCacheTaskLocation对象,并返回字符串;非内存(在磁盘)中,封装为HostTaskLocation对象,返回字符串;
最终,返回分区位置信息字符串集合;
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
override def getPreferredLocations(split: Partition): Seq[String] = {
//获取分区对应分片信息
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs = hsplit match {
case lsplit: InputSplitWithLocationInfo =>
//位置信息转换
HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
case _ => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
}
private[spark] object HadoopRDD extends Logging {
private[spark] def convertSplitLocationInfo(
infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
//处理分片位置信息
Option(infos).map(_.flatMap { loc =>
//获取位置信息
val locationStr = loc.getLocation
if (locationStr != "localhost") {
//判断hdfs中数据在内存还是磁盘中
if (loc.isInMemory) {
//内存中,创建HDFSCacheTaskLocation对象,返回字符串
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
//在磁盘中,创建HostTaskLocation对象,返回字符串
Some(HostTaskLocation(locationStr).toString)
}
} else {
None
}
})
}
}
3.1.3.1.2.MapPartitionsRDD
wholeTextFiles、textFile、map、filter等算子调用后返回MapPartitionsRDD实例化对象;
如下图所示,MapPartitionsRDD中并没有重写getPreferredLocations方法,使用其父类getPreferredLocations方法实现protected def getPreferredLocations(split: Partition): Seq[String] = Nil
;
由于map/filter这类操作不存在位置偏好,在spark调度时发现getPreferredLocations = Nil后会不断向上查找血缘,直到找到实现getPreferredLocations的RDD
3.1.3.1.3.ShuffledRDD
reduceByKey 、groupByKey 等shuffle类算子调用后返回ShuffledRDD实例化对象;
说明:计算流程在shuffle过程中,涉及将数据spill(溢写)到磁盘以及dag划分stage操作;
以上操作可能导致通过血缘追溯无法找到溢写的磁盘位置;
在spark中,输出追踪器mapOutputTracker管理各个shuffleMapTask的输出数据;reduce任务可以根据mapOutputTracker提供的信息决定从哪些executor中获取需要的map输出信息;
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
override protected def getPreferredLocations(partition: Partition): Seq[String] = {
//从sparkEnv中获取输出追踪器
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
//从输出追踪器中获取位置信息
tracker.getPreferredLocationsForShuffle(dep, partition.index)
}
}
3.1.3.1.4.JdbcRDD
JdbcRDD:jdbc等需要远程获取的rdd;
如下图所示,JdbcRDD中并没有重写getPreferredLocations方法,使用其父类getPreferredLocations方法实现protected def getPreferredLocations(split: Partition): Seq[String] = Nil
;
对于jdbc这类需要远程访问的rdd,无论那个位置都是一样的;
4.task执行
在dag完成stage划分后并根据分区将stage的分区封装为task数组,最后会将task数组转换为taskSet,并将该taskSet提交到taskScheduler,由taskScheduler#submitTasks()方法处理;
task在taskSchedule中的执行详情请参考【Spark源码-任务提交流程-8-DAGScheduler任务切分-3.taskScheduler调用】;
接下来主要分析task任务执行过程中资源申请策略;
4.1.创建taskSetManager管理taskSet
taskScheduler为每个taskSet创建一个taskSetManager;
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging {
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//为每一个taskSet创建一个taskSetManager对象
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//其他代码。。。。
}
//交给backend处理
backend.reviveOffers()
}
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
}
}
4.1.1.TaskSetManager实例化
在taskSetManager实力化过程中,将taskSet中的task进行了分流,将task交给不同的对象来执行:executor、主机、机架;
4.1.1.1.任务task执行的5中级别
PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。
NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
NO_PREF: 数据从哪里访问都一样快,不需要位置优先
RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢
ANY: 数据在非同一机架的网络上,速度最慢
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
condition <= constraint
}
}
4.1.1.2.taskSet中任务添加到对应待处理任务集中
1、taskSetManager实例化时,在taskSetManager对象中维护了5个任务集,分别为executor的待处理任务集pendingTasksForExecutor,主机的待处理任务集pendingTasksForExecutor,机架的待处理任务集pendingTasksForRack,没有位置优先级要求的待处理任务pendingTasksWithNoPrefs,所有的待处理任务集allPendingTasks;
2、实例化过程中,轮训taskSet中所有task,根据task的位置实现类类型,将task的索引index添加到对应任务集:
ExecutorCacheTaskLocation类型位置,task索引加入位置所在executor的待处理任务集中;
HDFSCacheTaskLocation类型位置,获取位置所在主机(节点)上可用executors,将task索引加入每个executor的待处理任务集;
将task的索引加入位置所在主机(节点)的待处理任务集;
将task的索引加入位置所在机架的待处理任务集;
如果task对数据位置没有优先级要求,将task索引index加入没有位置优先级要求的待处理任务集;
将task的索引加入所有的待处理任务集allPendingTasks;
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
//每个executor的待处理任务集
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
//每个主机的待处理任务集
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
//每个机架的待处理任务集
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
//没有位置优先级要求的待处理任务
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
//所有的待处理任务集
private val allPendingTasks = new ArrayBuffer[Int]
val tasks = taskSet.tasks
val numTasks = tasks.length
//轮训taskSet中所有task
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
private[scheduler] var myLocalityLevels = computeValidLocalityLevels()
private[spark] def addPendingTask(index: Int) {
//轮训索引index对应task的所有位置信息
for (loc <- tasks(index).preferredLocations) {
//匹配位置信息
loc match {
case e: ExecutorCacheTaskLocation =>
//针对ExecutorCacheTaskLocation位置,将task索引index加入executor的待处理任务集
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
//针对HDFSCacheTaskLocation,获取位置所在节点上可用的executor
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
//如果节点存在可用executor,轮训executor,将task索引index加入该executor的待处理任务集
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
//将task索引index加入位置所在主机(节点)的待处理任务集
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
//将task索引index加入放置【位置所在主机(节点)】的机架的待处理任务集
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
//如果task对数据位置没有优先级要求,将task索引index加入没有位置优先级要求的待处理任务集
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
//将task索引index加入所有的待处理任务集
allPendingTasks += index
}
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
}
4.1.1.3.计算taskSet中使用的位置级别
根据待处理任务集和可用对象定义taskSet的位置级别;
最后返回taskSet的位置级别数组;
位置级别数组中的数据添加先后顺序:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY;
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
//每个executor的待处理任务集
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
//每个主机的待处理任务集
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
//每个机架的待处理任务集
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
//没有位置优先级要求的待处理任务
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
//所有的待处理任务集
private val allPendingTasks = new ArrayBuffer[Int]
//暴露当前taskSet的位置级别
private[scheduler] var myLocalityLevels = computeValidLocalityLevels()
//计算此任务集中使用的位置级别
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
//executor待处理任务集不为空且executor可用,该taskSet是PROCESS_LOCAL级别的
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
//主机待处理任务集不为空且主机上存在可用executor,该taskSet是NODE_LOCAL级别的
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
//没有位置优先级要求的待处理任务集不为空,该taskSet是NO_PREF级别的
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
//机架待处理任务集不为空且机架上存在可用主机,该taskSet是RACK_LOCAL
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
//所有的taskSet都是ANY级别的
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
//返回位置级别数组:一个taskSet可能是多种级别的
levels.toArray
}
}
4.1.2.getAllowedLocalityLevel-获取跟据延迟策略可以启动任务的级别
1、轮询当前taskSetManager所有的位置级别,从0开始;
2、判断当前索引对应级别的任务待处理集中,是否存在需要调度执行的任务,有的话就返回true,没有就将任务集信息从队列中移除;
3、针对由任务需要执行的位置级别,判断该级别与上传执行getAllowedLocalityLevel方法的间隔时间是否超过允许时间,超过则将位置级别索引+1,继续轮询;否则返回当前位置级别;
总结:
根据当前taskSEtManager的位置级别数组从低到高判断是否符合执行要求;
判断依据:
1、该级别存在待执行任务;
2、该级别2次执行getAllowedLocalityLevel方法的间隔时间不能超过允许时间(默认3s);
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
//当前位置级别索引
private var currentLocalityIndex = 0
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// 判断任务集中是否由任务需要调度执行,并移除已经参与过调度或执行成功的任务
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
//重试次数为0,并且没有执行成功的记录;代表该任务没有执行过,需要调度执行
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
//针对已经调度过或执行成功的任务,从任务集中移除
pendingTaskIds.remove(indexOffset)
}
}
false
}
//判断队列中是否有任务需要调度执行
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
//轮询队列,判断是否有需要调度执行的任务
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
//判断任务是否需要调度执行
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// 针对没有任务需要调度执行的executor/host/rack,将其从对列中移除
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
//轮询当前taskSetManager中所有位置级别:从索引0开始
while (currentLocalityIndex < myLocalityLevels.length - 1) {
//判断当前级别的任务队列中,是否有任务需要执行
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {//当前索引对应级别没有任务需要执行,索引+1;
//将当前时间设置为这个级别最后一次调用时间
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
}
//当前时间与上次调用getAllowedLocalityLevel方法的时间间隔大于当前级别等待时间
else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
//将getAllowedLocalityLevel最后调用时间加上当前级别调用间隔
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
//位置级别索引+1
currentLocalityIndex += 1
} else {//当前级别存在任务需要执行,且与上次调用此方法的时间间隔在允许范围内
//返回当前位置级别信息
return myLocalityLevels(currentLocalityIndex)
}
}
//返回当前taskSetManager位置级别集合中最高索引的位置级别
myLocalityLevels(currentLocalityIndex)
}
}
4.1.3.1.每个位置级别允许的间隔时间
可以通过参数设置不同位置级别两次延迟策略的间隔时间;
所有级别的默认延迟间隔时间都是3s;
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get(config.LOCALITY_WAIT)
//根据不同级别匹配不同参数
val localityWaitKey = level match {
case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}
if (localityWaitKey != null) {
conf.getTimeAsMs(localityWaitKey, defaultWait.toString)
} else {
0L
}
}
}
package object config {
//默认等待事件3s
private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
}
4.1.3.resourceOffer获取executor上需要执行的任务集合
返回的是任务描述对象的集合;
从executor待处理任务集中获取待处理任务,获取后将任务从待处理任务集中移除;然后根据获取的任务信息组装任务描述对象;
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
//黑名单判断
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
//获取可以启动任务的任务级别
allowedLocality = getAllowedLocalityLevel(curTime)
//启动任务的位置级别不能高于请求的位置级别
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
//获取executor待处理任务信息(获取后将任务信息从待处理任务集中移除),组装任务描述信息
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
val serializedTask: ByteBuffer = try {
ser.serialize(task)
} catch {
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
sched.dagScheduler.taskStarted(task, info)
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
task.partitionId,
addedFiles,
addedJars,
task.localProperties,
serializedTask)
}
} else {
None
}
}
}
4.1.3.1.dequeueTask-获取节点待处理任务
从给定executor的待处理任务队列中取出任务,并返回其索引和位置级别;
按照executor、host、no_pref、rack、any的顺序获取,前面如果获取到了,直接返回,后面的不在获取;
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
//从executor待处理任务集中取出任务索引,并以tuple结构返回任务信息
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
//从主机(节点)待处理任务集中取出任务索引,并以tuple结构返回任务信息
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
//从位置优先级要求的待处理任务集中取出任务索引,并以tuple结构返回任务信息
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
//从机架待处理任务集中取出任务索引,并以tuple结构返回任务信息
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
//从所有的待处理任务集中取出任务索引,并以tuple结构返回任务信息
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}
//如果所有其他任务都已安排,找到一个推测任务
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}
}
4.2.taskSheduler申请资源
在实例化taskSetManager过程中,对taskSet进行了分流,所有具备执行task任务的对象(executor、主机、机架),在其待处理任务集中都添加了该task;这种情况下,存在多个队列中含有同一个任务的情况,那么task究竟在那个对象上执行,就是接下来要分析的内容;
在taskScheduler任务执行流程中,backend将任务推个driver节点,driver节点申请资源,并分配任务到对应资源的executor执行;详情参考【Spark源码-任务提交流程之-9-TaskScheduler任务调度器】;
下面就driver节点申请资源进行分析;
4.2.1.入口
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
private def makeOffers() {
// 确保执行器executor可用:资源申请
val taskDescs = withLock {
// 过滤出可用executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
//封装executor上可用空闲资源
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
//资源申请
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
//根据分配的资源启动任务执行
launchTasks(taskDescs)
}
}
}
4.2.2.TaskSchedulerImpl#resourceOffers申请资源
1、完善executor与host、host与rack、host与executor,executor与task的映射关系;
2、通过位置优先级策略,选择task任务执行的executorid;并封装在任务描述对象中;
3、返回任务描述对象集合;
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging {
// 每个执行器上运行的任务ids
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
// 每个主机上的执行器集;
// 它用于计算hostsAlive,进而用于决定何时可以获得给定主机上的数据位置
protected val hostToExecutors = new HashMap[String, HashSet[String]]
// 每个机架上的主机集
protected val hostsByRack = new HashMap[String, HashSet[String]]
// executor与主机的映射关系
protected val executorIdToHost = new HashMap[String, String]
//offers:封装可用执行器的相关信息
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 将每个slave标记为活着的,并记住它的主机名
//也跟踪是否添加了新的执行器
var newExecAvail = false
for (o <- offers) {
//初始化executor所在主机的执行器集
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
//维护executor所在主机的执行器集
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
//完善executor与主机的映射关系
executorIdToHost(o.executorId) = o.host
//初始化executor的任务集
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
//维护机架的主机集
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// 将黑名单过期的节点从黑名单中删除
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
//过滤出非黑名单内executor
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)
//打乱executor集合的顺序
val shuffledOffers = shuffleOffers(filteredOffers)
// 构建一个分配给每个executor的任务列表
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
// 每个executor的空闲cpu核数
val availableCpus = shuffledOffers.map(o => o.cores).toArray
//调度池中所有taskSetManager
val sortedTaskSets = rootPool.getSortedTaskSetQueue
//向调度池中当前taskSetManager中添加执行器
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {//判断是否当前taskSetManager
taskSet.executorAdded()
}
}
//轮询调度池中taskSetManager
for (taskSet <- sortedTaskSets) {
//可用插槽数量
val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
// 针对barrier taskSetManager,如果可用插槽数量小于待处理任务数,跳过该taskSetManager
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
s"number of available slots is $availableSlots.")
} else {
var launchedAnyTask = false
// 记录分配给tasks的所有executorId
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
//轮询当前taskSet的位置级别集合
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
//完成当前TaskSetManager当前位置级别的任务资源分配并返回是否存在待处理任务
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
//其他代码。。。。
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
//返回任务描述对象集合
return tasks
}
}
4.2.2.1.TaskSchedulerImpl#resourceOfferSingleTaskSet
将单个taskSetManager中当前位置级别需要执行的任务绑定到对应的执行器上面,并通过addressesWithDescs参数实现结果返回;
并返回当前taskSetManager是否有task需要执行;
通过轮询所有可用executor,获取该executor所有需要执行的任务集合,将该任务集合通过添加到tasks参数实现结果返回;
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging {
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
//默认当前位置级别没有任务需要执行
var launchedTask = false
//轮询所有可用executor
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
//当前executor上cpu够用
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
//获取executor上当前位置级别需要执行的任务集合,然后轮询任务集合
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
//将taskDesc对象添加到当前executor的待执行任务集合中
tasks(i) += task
val tid = task.taskId
//缓存任务相关映射信息
taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
//executor上空闲cpu资源减少
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
//将当前任务绑定到执行任务的executor上
addressesWithDescs += (shuffledOffers(i).address.get -> task)
}
//标识当前位置级别有任务需要执行
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
}
5.总结
5.1.流程总结
1、rdd中数据根据不同分区策略,会有不同的分区,每个分区存储rdd一部分数据;rdd是一个分布式数据,为了保障数据不丢失,每个分区都会有多个副本存在不同的节点中;在rdd计算过程中,为了提升计算的速度,rdd采用移动计算不移动数据的策略,即在分区最近的副本上做计算;那么多个副本,究竟使用那个副本,就涉及到了rask执行的优先位置选择;
2、rdd在创建/转换过程中,都涉及分区的划分;
3、在dag划分stage后,会将stage根据分区划分为不同的task,一个stage对应一个task数组,最后会将task数组封装为taskSet提交到taskScheduler;
4、在stage划分task时,会获取该task对应分区的副本位置集合,在构建task时传入task中;
5、taskScheduler在处理dag提交过来的taskSet时,会构建一个taskSetManager对象,通过该对象来对任务进行管理;
6、在taskSetManager实例化过程中,会将taskSet中所有任务加入executor、host、rack、no_pref、any 5个待处理任务集中;
7、taskScheduler后面会将任务提交到driver节点,driver节点先获取任务信息后,再讲任务进行分配到具体的executor;
8、driver节点获取任务信息,就是调用TaskSchedulerImpl#resourceOffers方法获取需要执行的任务描述信息,在任务描述信息里面维护了每个任务在那个executor上执行;
5.2.位置优先级策略总结
1、共有5中位置级别:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY;优先级依次递减;
2、一个taskSetManager中,每个级别延迟间隔时间默认不超过3s;
3、将任务添加到可执行任务的executor、host、no_pref、rack、any的待处理任务集总;
4、调用TaskSchedulerImpl#resourceOffers方法获取需要执行的任务描述信息时,会轮训具备空闲资源的executor,根据PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY优先级别依次从executor的对应级别待处理任务集中取出任务封装为任务描述;5中优先级别,如果前面取到待执行任务,则不在向后取,直接返回;
6.参考资料
Spark3.0源码分析-数据本地化优先级及延迟调度策略
RDD 编程指南之弹性分布式数据集 (RDD)
Spark源码-任务提交流程之-9-TaskScheduler任务调度器
最后
以上就是贪玩发带为你收集整理的Spark源码-任务提交流程之-10-task最佳位置算法1.概述2.RDD创建2.job划分stage3.stage划分task4.task执行5.总结6.参考资料的全部内容,希望文章能够帮你解决Spark源码-任务提交流程之-10-task最佳位置算法1.概述2.RDD创建2.job划分stage3.stage划分task4.task执行5.总结6.参考资料所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复