概述
原文地址:https://blog.csdn.net/zhanglh046/article/details/78485701
Master是Spark Standalone运行模式下的主节点,主要用于管理集群,负责资源的调度。它继承了ThreadSafeRpcEndpoint和LeaderElectable类,由于继承ThreadSafeRpcEndpoint,所以Master就可以作为一个RpcEndpoint;继承LeaderElectable之后,就可以参加选举。
一、重要属性
- RpcEnv rpcEnv:用于注册和维护RpcEndpoint和RpcEndpointRef。在RpcEnv定义了RPC通信框架启动、停止和关闭等抽象方法。
- RpcAddress address: 维护RPC通信框架的host和port。
- Int webUiPort:web ui 端口。
- HashSet[WorkerInfo] workers:维护全部worker的信息。
- HashMap[String, WorkerInfo] idToWorker:维护workid和workerinfo之间的映射关系。
- HashMap[RpcAddress, WorkerInfo] addressToWorker:维护worker节点的地址信息和wokrerinfo的映射关系。
- HashSet[ApplicationInfo] apps:维护全部Application的信息。
- HashMap[String, ApplicationInfo] idToApp:维护application id和 ApplicationInfo映射关系。
- ArrayBuffer[ApplicationInfo] waitingApps:当前处于等待的application。
- ArrayBuffer[ApplicationInfo] completedApps:当前已经完成的application。
- HashMap[RpcEndpointRef, ApplicationInfo] endpointToApp: 维护 RpcEndpointRef和ApplicationInfo之间的映射。
- HashMap[RpcAddress, ApplicationInfo] addressToApp:维护 RpcAddress和ApplicationInfo之间的映射。
- HashSet[DriverInfo] drivers:维护所有驱动的信息。
- ArrayBuffer[DriverInfo] waitingDrivers:当前处于等待的驱动。
- ArrayBuffer[DriverInfo] completedDrrivers:当前已经完成的驱动。
二、两个重要配置
- spark.worker.timeout :Worker节点启动后,会定时向Master发送心跳消息。该配置就是用于Master在多久没有收到Worker心跳消息的超时时间,默认是60s。
- spark.deploy.recoveryMode:Master主备切换时用到的恢复模式,默认为NONE。
三、核心方法
1、startRpcEnvAndEndpoint()
- 注册Master的RpcEndpoint,并返回RpcEndpointRef,用于发送消息。
- 向Master的通信终端发送请求,获取绑定的端口号。
- 返回一个(RpcEnv, webUIPort, restPort)。
- 在消息通信中,其他对象只要获取了Master的RpcEndpointRef,就能够发送消息给Master进行通信。
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(host: String, port: Int, webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
// 创建RpcEnv实例
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
// 注册'Master' RpcEndpoint,将其及其引用注册到RpcEnv中并返回RpcEndpointRef,用于发送消息
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
// 向Master的通信终端发送请求,获取绑定的端口号
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
// 返回一个(RpcEnv, webUIPort, restPort)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
2、onStart()
- 在初始化Master时,调用该方法。
- 构建webUi 和 启动rest server。
- 守护线程启动一个调度机制,定期检查Worker是否超时。
- 进行Master HA相关的操作(根据恢复模式的配置初始化持久化引擎,初始化选举代理实例)。
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
// 构建Master的Web UI,用于查看向Master提交的应用程序等信息
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
// 守护线程启动一个调度机制,定期检查Worker是否超时(spark.worker.timeout配置项)
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
// 如果启用了rest server,那么启动rest服务,可以通过该服务向master提交各种请求
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
// 如果恢复模式是ZOOKEEPER,那么通过zookeeper来持久化恢复状态
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
// 如果恢复模式是文件系统,那么通过文件系统来持久化恢复状态
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
// 如果恢复模式是定制的,那么指定你定制的全路径类名,然后产生相关操作来持久化恢复状态
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
3、receive()
/**
* receive是一个偏函数,对于偏函数而言,是PartialFunction[A,B]类的一个实例,A是可以接收的类型,
* B是需要返回的类型。对应着这里的receive方法,所以可以接收任何类型,不需要返回
* @return
*/
override def receive: PartialFunction[Any, Unit] = {
// 如果是ElectedLeader请求,表示需要重新选举Master
case ElectedLeader =>
// 根据配置的spark.deploy.recoveryMode,决定使用哪一种恢复模式,然后决定采用什么持久化引擎
// 然后根据持久化引擎,读取持久化数据,得到一个已经存储的(applicationInfo,driverInfo,workerInfo)的一个三元组
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
// 根据读取的持久化数据是否都为空,判断RecoveryState状态是否是alive还是recovering
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
// 如果处于recovering状态
if (state == RecoveryState.RECOVERING) {
// 开始恢复数据
beginRecovery(storedApps, storedDrivers, storedWorkers)
// 后台线程调度一个线程去定期检查master完成了恢复工作
recoveryCompletionTask= forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
// 如果是CompleteRecovery请求,则调用completeRecovery
case CompleteRecovery => completeRecovery()
// 如果是RevokedLeadership请求,则是关闭Master,将会重新触发master Leadership选举
case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
// 如果请求是RegisterWorker请求,表示Master注册新的Worker
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
// 如果当前节点状态是standby,返回MasterInStandby
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
// 判断维护的workerid->WorkerInfo映射是否包含这个worker id
// 如果包含返回wokerid,则返回 worker id重复的RegisterWorkerFailed
context.reply(RegisterWorkerFailed("Duplicateworker ID"))
} else {// 表示当前节点为master,且要注册是worker id之前是不存在的
// 创建worker,并进行注册,注册成功并且返回RegisteredWorker请求,然后开始调度
// 否则返回RegisterWorkerFailed请求,worker注册失败
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same" +
"address:" + workerAddress)
context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "
+ workerAddress))
}
}
// 如果是RegisterApplication请求,则判断是不是leader,从而注册应用程序
case RegisterApplication(description, driver) =>
// 其他的非leader的master是不能进行应用程序的创建和注册
if (state == RecoveryState.STANDBY) {
// ignore,don't send response
} else {
logInfo("Registering app " + description.name)
// 创建应用程序和driver
val app = createApplication(description, driver)
// 注册应用程序
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// 持久化引擎添加该application
persistenceEngine.addApplication(app)
// 向master发送RegisteredApplication请求,表示注册已完成
driver.send(RegisteredApplication(app.id, self))
// 重新分配资源
schedule()
}
// 如果ExecutorStateChanged请求,表示Executor状态发生改变
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
// 通过application获取运行该app的executor得到指定的executor
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) =>
// 获取appinfo信息
val appInfo = idToApp(appId)
// 更新该executor的状态为指定的状态
val oldState = exec.state
exec.state = state
// 如果指定的状态时处于正在运行的状态,将retry重试次数置为0
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor$execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
// 给executor对应的 application发送ExecutorUpdated请求
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
// 如果指定状态该是已经完成状态
if (ExecutorState.isFinished(state)) {
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// 如果应用程序已经运行完毕,则从appInfo移除这个executor
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
// 该executor所对应的worker也会移除该executor
exec.worker.removeExecutor(exec)
// 重试一定次数,不再无限制循环
val normalExit = exitStatus == Some(0)
// 只要retry次数小于10,那么executor的资源就会不断的调整
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
//重新分配资源
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
// 如果发送DriverStateChanged请求,表示Driver转态发生变化
case DriverStateChanged(driverId, state, exception) =>
// 如果Driver的state为ERROR | FINISHED | KILLED | FAILED,删除它
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpectedstate update for driver $driverId: $state")
}
// 如果发送的是Heartbeat请求,表示心跳检测机制,由worker向master发起的
case Heartbeat(workerId, worker) =>
// 根据workerId获取worker
idToWorker.get(workerId) match {
// 如果worker存在,则更新workinfo的lastHeartbeat属性,否则表示该worker还没有向master注册
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
// 如果worker集合已经存在这个worker
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Askingit to re-register.")
// 则worker向master发起ReconnectWorker请求
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Thisworker was never registered, so ignoring the heartbeat.")
}
}
// 如果是MasterChangeAcknowledged请求,表示application已经被master确认,将app状态置为waiting
case MasterChangeAcknowledged(appId) =>
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
if (canCompleteRecovery) { completeRecovery() }
// 如果是WorkerSchedulerStateResponse,表示worker调度状态响应请求
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
// 根据workerId获取worker
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
// worker状态置为alive
worker.state = WorkerState.ALIVE
// 从指定的executor中过滤出哪些是有效的executor
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
// 遍历有效的executors
for (exec <- validExecutors) {
// 获取executor所对应的app
val app = idToApp.get(exec.appId).get
// 为app设置executor,比如哪一个worker,多少核数等资源
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
// 将该executor添加到该woker上
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
// 将所有的driver设置为RUNNING然后加入到worker中
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
// 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
if (canCompleteRecovery) { completeRecovery() }
// 如果是WorkerLatestState,表示woreker最近的状态
case WorkerLatestState(workerId, executors, driverIds) =>
// 根据指定的wrokerId获取worker
idToWorker.get(workerId) match {
case Some(worker) =>
// 比那里指定的executor,判断指定的这些executor是否能够和worker里的executor进行匹配
for (exec <- executors) {
val executorMatches = worker.executors.exists {
case (_, e) => e.application.id == exec.appId && e.id == exec.execId
}
// 如果匹配不上则让worker kill掉这executor
if (!executorMatches) {
// masterdoesn't recognize this executor. So just tell worker to kill it.
worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
}
}
// 遍历传递进来的driver
for (driverId <- driverIds) {
// 判断driver是否匹配
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
// 如果匹配不上则让worker kill掉这driver
if (!driverMatches) {
// masterdoesn't recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
}
}
case None =>
logWarning("Worker state from unknown worker: " + workerId)
}
// 如果是UnregisterApplication请求,表示不注册app,从当前master中移除
case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
// 如果是CheckForWorkerTimeOut,表示检测worker超时的请求
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
}
4、receiveAndReply()
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// 如果是RequestSubmitDriver请求,表示提交driver,更新master所维护的driver信息
case RequestSubmitDriver(description) =>
// 如果master不是active,返回错误
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can onlyaccept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 创建driver
val driver = createDriver(description)
// 持久化引擎添加drriver
persistenceEngine.addDriver(driver)
// drivers集合和waitingDrivers集合添加driver
waitingDrivers+= driver
drivers.add(driver)
schedule()// 开始调度
// 返回成功的请求消息
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driversuccessfully submitted as ${driver.id}"))
}
// 如果是RequestKillDriver请求,表示kill掉该driver
case RequestKillDriver(driverId) =>
// 如果master不是active,返回错误
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Canonly kill drivers in ALIVE state."
context.reply(KillDriverResponse(self, driverId, success = false, msg))
} else {
logInfo("Asked to kill driver " + driverId)
// 获取指定的driver
val driver = drivers.find(_.id == driverId)
driver match {
// 从master所维护的driver先关列表或者集合中移除这个driver
case Some(d) =>
// 处于等待的driver集合包含这个driver
if (waitingDrivers.contains(d)) {
// 移除并且发送DriverStateChanged请求
waitingDrivers -= d
self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
} else {
// 否则让worker kill掉这个driver
d.worker.foreach { w =>
w.endpoint.send(KillDriver(driverId))
}
}
// 返回KillDriverResponse请求
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
context.reply(KillDriverResponse(self, driverId, success = true, msg))
case None =>
val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
context.reply(KillDriverResponse(self, driverId, success = false, msg))
}
}
// 如果是RequestDriverStatus,表示获取driver状态信息
case RequestDriverStatus(driverId) =>
// 如果master不是active,返回错误
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can onlyrequest driver status in ALIVE state."
context.reply(
DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))
} else {
// 从当前的drivers集合和已经完成的driver集合查找这个driver,并返回相关信息
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
context.reply(DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))
case None =>
context.reply(DriverStatusResponse(found = false, None, None, None, None))
}
}
// 如果是RequestMasterState请求,则表示获取master状态
case RequestMasterState =>
context.reply(MasterStateResponse(
address.host, address.port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state))
// 如果是BoundPortsRequest,则表示获取绑定的那些端口
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
// 如果是RequestExecutors,则表示为application设置目标数量的executor
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))
// 如果是KillExecutors,表示杀掉application指定的executors,返回操作状态
case KillExecutors(appId, executorIds) =>
val formattedExecutorIds = formatExecutorIds(executorIds)
context.reply(handleKillExecutors(appId, formattedExecutorIds))
}
5、onDisconnected()
override def onDisconnected(address: RpcAddress): Unit = {
// Thedisconnected client could've been either aworker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
6、beginRecovery()
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
// 遍历每一个存储的application,注册该application,并且发送MasterChanged请求
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)
// 将该application状态置为UNKNOWN状态
app.state = ApplicationState.UNKNOWN
// 然后这个app向master发送MasterChanged请求
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
// 遍历每一个存储的driver, 更新master所维护的driver集合
for (driver <- storedDrivers) {
drivers += driver
}
// 遍历每一个存储的wroker,然后向master注册worker
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
// 注册worker,就是更新master的woker集合,和worker相关的映射列表
registerWorker(worker)
// 将该worker状态置为UNKNOWN状态
worker.state = WorkerState.UNKNOWN
// 然后改worker向master发送MasterChanged请求
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
7、completeRecovery()
// 恢复完毕,重新创建Driver,完成资源的重新分配
private def completeRecovery() {
// 如果状态不是recovering则返回
if (state != RecoveryState.RECOVERING) { return }
// 然后状态置为completing_recovery(正处于恢复中)
state = RecoveryState.COMPLETING_RECOVERY
// 杀掉那些不响应但是状态不是UNKNOWN的worker和application
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// 重新调度未被任何worker声称的driver,即还没有worker来运行
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
// 如果是driver是监管者,则重新发起driver,否则删除driver
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
// 然后状态置为alive
state = RecoveryState.ALIVE
// 重新分配资源
schedule()
logInfo("Recovery complete - resuming operations!")
}
8、removeWorker()
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
// 更新该worker状态为DEAD
worker.setState(WorkerState.DEAD)
// 从worker相关的映射中移除这个worker
idToWorker -= worker.id
addressToWorker-= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}
// 遍历worker的executors
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
// 处于该worker上executor里的application发送ExecutorUpdated请求
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
// 该executor状态置为LOST
exec.state = ExecutorState.LOST
// 该executor的application移除executor
exec.application.removeExecutor(exec)
}
// 遍历worker上的driver
for (driver <- worker.drivers.values) {
// 判断driver是否是supervise,如果是重新发起driver,否则移除该driver
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
// 持久化引擎移除该worker
persistenceEngine.removeWorker(worker)
}
9、removeApplication()
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
// 如果master所维护的application集合包含这个application,则移除它,并且相关的application映射也移除这个app
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (reverseProxy) {
webUi.removeProxyTargets(app.id)
}
// 如果已经完成application的大小大于或者等于spark.deploy.retainedApplications
if (completedApps.size >= RETAINED_APPLICATIONS) {
// 计算要删除的数量
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
applicationMetricsSystem.removeSource(a.appSource)
}
// 从已经完成application数组移除计算的需要删除的数量的apps
completedApps.trimStart(toRemove)
}
// 然后将该app加入到完成列表
completedApps += app // Remember it in our history
// 等待列表移除这个app
waitingApps -= app
// kill掉运行该app的所有的executor
for (exec <- app.executors.values) {
killExecutor(exec)
}
// 重新标记application状态
app.markFinished(state)
// 如果app不出于完成状态(FINISHED),则发送ApplicationRemoved请求
if (state != ApplicationState.FINISHED) {
app.driver.send(ApplicationRemoved(state.toString))
}
// 持久化引擎移除这个application
persistenceEngine.removeApplication(app)
// 重新调度
schedule()
// 向所有worker发送该app已经完成的请求ApplicationFinished
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}
10、registerApplication()
private def registerApplication(app: ApplicationInfo): Unit = {
// 获取app的RpcAddress
val appAddress = app.driver.address
// 如果已经注册过,则直接返回
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
apps += app // 添加这个app到master所维护的application集合
// 并且把app相关数据存放到对应application映射列表
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}
11、registerWorker()
private def registerWorker(worker: WorkerInfo): Boolean = {
// 从master维护的worker集合移除状态为dead的worker
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
// 获取指定要注册worker的RpcAddress
val workerAddress = worker.endpoint.address
// 如果RpcAddress->WorkInfo的映射包含workerAddress,则获取这个worker
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
// 如果状态是UNKNOWN
if (oldWorker.state == WorkerState.UNKNOWN) {
// 意味着这个worker是重新恢复的worker,所以之前的老的worker需要移除掉
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
// 从master维护的worker集合添加这个worker
workers += worker
// 更新master中相关worker的映射或者集合列表
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}
12、timeOutDeadWorkers()
/** 移除那些超时的worker */
private def timeOutDeadWorkers() {
// 获取那些lastHeartbeat超过指定的超时时间的worker
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
// 遍历这些worker,如果状态不是dead,然后删除
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker)
} else {
// 如果是dead,满足条件,则从master维护的workers集合移除这个worker
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS+ 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc.for long enough; cull it
}
}
}
}
13、onStop()
override def onStop() {
masterMetricsSystem.report()
applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
leaderElectionAgent.stop()
}
最后
以上就是激昂音响为你收集整理的Spark架构原理-Master源码分析一、重要属性二、两个重要配置三、核心方法的全部内容,希望文章能够帮你解决Spark架构原理-Master源码分析一、重要属性二、两个重要配置三、核心方法所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复