我是靠谱客的博主 野性河马,最近开发中收集的这篇文章主要介绍Spark源码解析03-Submit提交流程及Driver启动流程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1、前言

由前面的文章Spark基础06-Spark client和cluster提交流程我们已经知道了Spark client和cluster提交模式流程

  1. 启动Driver进程,并向集群管理器注册应用程序
  2. 集群资源管理器根据任务配置文件分配并启动Executor
  3. Executor启动之后反向到Driver注册,Driver已经获取足够资源可以运行
  4. Driver开始执行main函数,Spark查询为懒执行,当执行到action算子时开始反向推算,根据宽依赖进行stage的划分,随后每一个stage对应一个taskset,taskset中有多个task,根据本地化原则,task会被分发到指定的Executor去执行
  5. 【一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果】
  6. Executor在执行期间会不断向Driver进行通信,汇报任务运行情况

其中两种提交模式最大的区别就在于Driver的启动位置,client模式driver启动在本地机器,cluster模式driver启动在cluster集群上的节点

下面我们通过追踪源码来验证下这个提交流程

2、Submit shell提交

我们在提交spark任务是通过调用${SPARK_HOME}/bin/spark-submit.sh 脚本提交任务,下面我们给出一个样例

spark-submit 
--master spark://127.0.0.1:7077 
# 集群模式
--deploy-mode cluster 
--driver-memory 1g 
--executor-memory 1g 
--executor-cores 1 
# 启动类
--class org.apache.spark.examples.SparkPi 
# jar包地址
${SPARK_HOME}/examples/jars/spark-examples.jar 

再看下spark-submit.sh脚本,我们可以看到通过调用org.apache.spark.deploy.SparkSubmit,并将我们的参数传入该类

01-submit-shell

3、SparkSubmit提交流程01-SparkSubmit

本文使用spark源码版本为 2.3.4 ,进入SparkSubmit类,我们直奔主题,找到SparkSubmit.main()方法

02-SparkSubmit-main

main()方法将入参进行封装为SparkSubmitArguments(),然后根据参数的类型执行相应的方法,这边我们主要跟踪提交流程

进入SparkSubmit.submit()方法,可以看到先定义了doRunMain()方法,然后进入判断,这边我们可以看到无论是走哪个分支最终还是调用了doRunMain()方法

02-SparkSubmit-submit

进入doRunMain()方法,可以看到有两个分支,一个是使用代理执行runMain()方法,一个是直接执行runMain()

02-SparkSubmit-submit-doRunMain

进入SparkSubmit.runMain(),这边我们要关注下注释,“使用submit参数运行子类的main法”,分为两个步骤

  1. 准备运行环境,设置适当的类路径,系统实行以及应用参数等为了运行 the child main class,基于集群管理器和部署模式
  2. 使用这个启动环境来调用 the main method of the child main class

从注释我们可以知道,runMain除了准备环境以外,我们需要关注的就是部署模式(deployMode)以及属性childMainClass

从源码我们可以看到prepareSubmitEnvironment(args)方法,返回一个tuple4,其中有我们需要关注的childMainClass属性

02-SparkSubmit-submit-runMain01

进入SparkSubmit.prepareSubmitEnvironment()方法,可以看到该方法只是一个过渡,实际是调用了doPrepareSubmitEnvironment()方法,这里注释也对了返回tuple4的说明,

//childArgs:the arguments for the child process 
//childClasspath:a list of classpath entries for the child 
//sparkConf:a map of system properties 
//childMainClass:the main class for the child 

02-SparkSubmit-submit-prepareSubmitEnvironment

进入SparkSubmit.doPrepareSubmitEnvironment(),我们先看下这个方法整整537行代码,这里我们就不一一分析方法的作用,我们只关注的核心点:deployMode以及childMainClass

02-SparkSubmit-doPrepareSubmitEnvironment01

SparkSubmit.doPrepareSubmitEnvironment(),大概612行的位置,可以看到如果部署模式deployMode是client模式,则childMainClass赋值为 args.mainClass

#这里的agrs.mainClass就是我们 submit提交的--class 参数
spark-submit 
--class org.apache.spark.examples.SparkPi 

02-SparkSubmit-doPrepareSubmitEnvironment02

下面我们到662行的位置,本篇文章采用的提交方式是spark-cluster方式,既:StandaloneCluster,下面我们简化下代码逻辑

// 是否使用rest方式
if (args.useRest) {
   // 不符合本次逻辑忽略
   ......
  } else {
	// childMainClass 赋值
    childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
    // childArgs 赋值
    childArgs += (args.master, args.primaryResource, args.mainClass)
  }
if (args.childArgs != null) {
    childArgs ++= args.childArgs
  }

从代码逻辑可以看到,最终childMainClass被赋值为STANDALONE_CLUSTER_SUBMIT_CLASS,此外,childArgs也被赋值了一些数值

02-SparkSubmit-doPrepareSubmitEnvironment03

下面我们看下STANDALONE_CLUSTER_SUBMIT_CLASS的赋值,将ClientApp的全类名赋值给STANDALONE_CLUSTER_SUBMIT_CLASS变量

02-SparkSubmit-doPrepareSubmitEnvironment04

下面我们看下ClientApp,是SparkApplication的子类

03-ClientApp

我们继续回到doRunMain()方法,第850行左右的位置,我们可以看到将childMainClass通过反射的方式创建了mainClass

02-SparkSubmit-submit-runMain03

下面我们继续跟着mainClass,来到第877左右的位置,下面我们简化下代码逻辑

// 判断mainClass是否为SparkApplication类型
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    // 如果是SparkApplication类型,则创建实例
    mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      // 非SparkApplication类型
      // 既Client模式提交的就会走该分支,mainClass 则是 --class 带的全类名
      new JavaMainApplication(mainClass)
    }

 // 调用实例的start()方法并传入参数
 app.start(childArgs.toArray, sparkConf)

02-SparkSubmit-submit-runMain04

从上面代码逻辑可以知道,如果是Client模式提交的,则是将我们提交的 --class 类通过JavaMainApplication包装一下然后直接本地调用start()启动服务,既:client提交模式,Driver启动在client本地机器上

04-JavaMainApplication

小结:

  • Spark提交是通过脚本调用SparkSubmit类,将我们参数进行封装以及校验,判断我们是基于那种提交模式(deployMode)来决定我们–main-class提交的类代码执行的位置
  • client模式,执行mainClass则是在本地机器
  • cluster模式,则是将mainClass封装成参数,传给ClientAPP进行下一步操作

下面我们把调用流程用图表示下

00-SparkSubmit-流程1

4、SparkSubmit提交流程02-ClientApp

由于本篇文章采用的是spark-cluster模式,mainClass是ClientApp,接下来我们看下,ClientApp.start()方法

由前面的文章Spark基础06-Spark client和cluster提交流程我们知道,Client需要与Master进行通信申请启动Driver,既我们可以猜测,Client需要有RpcEnv环境,同时要与Master进行通信,还需要获取Master的引用,下面我们从源码看下是否与我们猜测相符

03-ClientApp

代码逻辑如下,与我们猜测的一致,RpcEnv环境,Master引用,在源码里可以看到,另外我们关注下将driverArgs参数以及Master引用传入到ClientEndpoint端点类,并且通过rpcEnv.setupEndpoint()方法进行端点注册,读过之前文章Spark源码解析01-Master启动流程,肯定知道,在注册端点的时候会伴随这调用端点的onStart()方法,将服务启动,接下来我们需要关注点就是ClientEndpoint类的onStart()方法

// 参数转化为Driver参数
val driverArgs = new ClientArguments(args)

// 准备RpcEnv
val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// 得到Master引用
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
// 注册ClientEndpoint端点
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

进入ClientEndpoint.onStart()方法,我们可以看到一个重要的参数mainClass = “org.apache.spark.deploy.worker.DriverWrapper”这个参数看起来与我们要启动Driver很相似,下面我们简化代码逻辑

05-Client-onStart

mainClass以及driverArgs最后被封装到DriverDescription对象中,然后调用asyncSendToMasterAndForwardReply()想Master发送RequestSubmitDriver消息

// 定义mainClass
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

val command = new Command(mainClass,
          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)
// 将mainClass封装到DriverDescription()中
val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
// 异步向Master发送RequestSubmitDriver消息
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))

接下来我们看下asyncSendToMasterAndForwardReply()方法,通过Master端点引用,向Master发送RequestSubmitDriver消息,注意:此处用的是ask()方法,所以我们要去对应的Master.receiveAndReply()方法中寻找相应消息处理逻辑

05-Client-asyncSendToMasterAndForwardReply

小结:

  • org.apache.spark.deploy.worker.DriverWrapper可能是要启动的Driver类
  • ClientAPP类的作用就为了与Master通信做准备,如:启动的Driver全类名,以及Driver所需要的参数,同时ClientAPP也准备好了RpcEnv通信环境,并将所需的参数封装到DriverDescription对象中,最后异步向所持有的所有Master引用发送消息RequestSubmitDriver

下面我们将调用流程图补充下

00-SparkSubmit-流程2

5、SparkSubmit提交流程03-Master

前面我们已经知道了,ClientAPP向Master发送RequestSubmitDriver消息,并且将必须参数分装到DriverDescription中,下面我们回到Master类的receiveAndReply(),知道RequestSubmitDriver消息的处理逻辑,可以看到通过调用createDriver(description)并返回一个driver

05-Master-receiveAndReply

进入Master.createDriver()方法,看到这里只是将参数desc进行了封装,转换成DriverInfo对象并返回

06-Master-createDriver

07-DriverInfo

我们回到receiveAndReply()方法,从代码逻辑来看,将driver参数封装成DriverInfo后,就全部存储到Master的driver信息集合中,然后调用schedule()方法,最后就是返回消息给Client;

这边我们可以看到没有任何关于Driver启动的操作,也就剩下schedule()方法来启动Driver

// 将driver参数进一步封装为 DriverInfo
val driver = createDriver(description)
// 加入集合
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
// 调度
schedule()
// 返回信息
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))

06-Master-driver参数

相信看过之前的文章Spark源码解析02-Worker启动流程以及与Master心跳通信的人,对schedule()方法并不陌生,因为在注册Worker的时候也调用了schedule()方法,但是并未研究该方法的用途,现在我们来研究下schedule()

进入schedule()方法,首先要关注的就是注释:“为等待的应用程序调度当前可用资源,每当有新的应用程序加入或资源可用性发生变化时,就会调用此方法”,我们可以简单理解,所有涉及到资源调度的都会调用该方法,既:Driver、Worker等申请资源操作

06-Master-schedule

从前面我们知道,Master将Driver的参数信息存入到遇Driver相关的集合中,例:waitingDrivers、drivers、persistenceEngine,下面我们简化下代码逻辑,

  • Driver相关
    • 遍历等待启动的Driver信息集合
    • 寻找符合要求的Worker
    • 调用launchDriver()方法来启动Driver
private def schedule(): Unit = {

    // 寻找还活着的Worker
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0

    /*
     * 1、Driver相关调度
     */
    // 1.1、遍历等待集合中的Driver
    for (driver <- waitingDrivers.toList) {
      var launched = false
      var numWorkersVisited = 0
      // 1.2、寻找合适的Worker
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 1.3、启动Driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    /*
     * Worker相关调度
     */
    startExecutorsOnWorkers()
  }

下面我们看下launchDriver()启动Driver方法,将Driver信息记录在Worker中,然后拿着worker的引用,向Worker发送LaunchDriver消息,并将driver信息传递过去

06-Master-launchDriver

小结:

  • Master所需要做的就是通过获取Driver信息,在自己管理的Worker集合中寻找符合要求启动的Worker,然后向Worker发送LaunchDriver方法来启动Driver

6、SparkSubmit提交流程04-Worker

回到Worker.receive()方法,我们找到对应的LaunchDriver消息处理流程,可以看到将Driver信息封装到DriverRunner类中,然后调用start()方法,启动Driver,更新Worker使用信息

08-Worker-LaunchDriver

这边我们进入DriverRunner.start()方法,这边我们不必太过关注Driver启动的细节,因为java启动进程的实现在Window和Linux平台的实现是不一样的,我们只需知道启动的Driver正是前面我们所熟知的org.apache.spark.deploy.worker.DriverWrapper类

09-Driver-start

回到DriverWrapper类,我们可以看到这里又有一个mainClass,这个mainClass就是我们通过–class提交的那个全类名,从代码逻辑我们看到,mainClass在DriverWrapper中会被反射创建,并最后调用invoke()方法,执行其main方法,可见,DriverWrapper运行了我们提交的代码,也就是DriverWrapper正是Driver的实例

至此,我们关于Driver的启动流程已经全部跟踪完毕

10-DriverWrapper

小结:

  • Worker则是通过接收到的Driver信息,在自己的机器上启动Driver,也就是启动参数传递中的mainClass org.apache.spark.deploy.worker.DriverWrapper
  • 从源码可以证实org.apache.spark.deploy.worker.DriverWrapper就是我们所熟知的Driver实例
  • 至此,Driver启动流程基本完成

7、总结

  • Spark的提交流程中涉及角色分别是:
    • (SparkSubmit -->) Client --> Master --> Worker --> Executor --> Driver
  • (对于spark-cluster而言)Client的实例是ClientApp,Driver的实例则是DriverWrapper
  • 结合前面的文章我们已经跟踪完,Spark的资源层、以及计算层的开端源码,下面将是Spark源码解析的重头戏,SparkContext源码的解析

最后

以上就是野性河马为你收集整理的Spark源码解析03-Submit提交流程及Driver启动流程的全部内容,希望文章能够帮你解决Spark源码解析03-Submit提交流程及Driver启动流程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部