概述
第1章 简介
经过前面几篇文章的介绍,TM已经申请到Slot,并且向JM提供了执行任务的Slot。本篇文章将继续走读源码,介绍JM向TM提交任务的流程。
第2章 具体步骤
2.1 启动JM
我们回到之前JM启动的代码:
org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
// 验证是否在主线程
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
// TODO 真正启动jobMaster(jobManager)服务
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
// TODO 重置并启动调度器
resetAndStartScheduler();
return Acknowledge.get();
}
这里通过resetAndStartScheduler已经启动了调度器,我们继续往下看。
2.2 启动调度器
org.apache.flink.runtime.jobmaster.JobMaster#resetAndStartScheduler
private void resetAndStartScheduler() throws Exception {
// ...
// TODO 启动调度
schedulerAssignedFuture.thenRun(this::startScheduling);
}
org.apache.flink.runtime.jobmaster.JobMaster#startScheduling
private void startScheduling() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
schedulerNG.registerJobStatusListener(jobStatusListener);
// TODO 启动调度
schedulerNG.startScheduling();
}
org.apache.flink.runtime.scheduler.SchedulerNG#startScheduling的实现方法:
org.apache.flink.runtime.scheduler.SchedulerBase#startScheduling
@Override
public final void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
registerJobMetrics();
startAllOperatorCoordinators();
// TODO 启动内部调度
startSchedulingInternal();
}
org.apache.flink.runtime.scheduler.DefaultScheduler#startSchedulingInternal
@Override
protected void startSchedulingInternal() {
log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
prepareExecutionGraphForNgScheduling();
// TODO 启动任务调度
schedulingStrategy.startScheduling();
}
2.3 调度任务
org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy在flink中有几种调度策略:
这里我们看PipelinedRegionSchedulingStrategy
org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy#startScheduling的实现
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#startScheduling
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region -> !region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
// TODO region方式调度任务
maybeScheduleRegions(sourceRegions);
}
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#maybeScheduleRegions
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
for (SchedulingPipelinedRegion region : regionsSorted) {
// TODO region方式调度任务
maybeScheduleRegion(region);
}
}
2.4 部署任务
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#maybeScheduleRegion
private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
// ...
// TODO 分配slot,部署任务
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
org.apache.flink.runtime.scheduler.SchedulerOperations#allocateSlotsAndDeploy的实现方法:
org.apache.flink.runtime.scheduler.DefaultScheduler#allocateSlotsAndDeploy
@Override
public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
// ...
// TODO 等待slot,部署任务
waitForAllSlotsAndDeploy(deploymentHandles);
}
这里的等待slot,指的是上一篇文章中TM向JM提供的slot信息。
org.apache.flink.runtime.scheduler.DefaultScheduler#waitForAllSlotsAndDeploy
private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
// TODO 等待slot,部署任务
FutureUtils.assertNoException(
assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
}
org.apache.flink.runtime.scheduler.DefaultScheduler#deployAll
private BiFunction<Void, Throwable, Void> deployAll(final List<DeploymentHandle> deploymentHandles) {
return (ignored, throwable) -> {
propagateIfNonNull(throwable);
for (final DeploymentHandle deploymentHandle : deploymentHandles) {
final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
checkState(slotAssigned.isDone());
// TODO 部署和异常处理
FutureUtils.assertNoException(
slotAssigned.handle(deployOrHandleError(deploymentHandle)));
}
return null;
};
}
org.apache.flink.runtime.scheduler.DefaultScheduler#deployOrHandleError
private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
return (ignored, throwable) -> {
if (executionVertexVersioner.isModified(requiredVertexVersion)) {
log.debug("Refusing to deploy execution vertex {} because this deployment was " +
"superseded by another deployment", executionVertexId);
return null;
}
if (throwable == null) {
// TODO 部署任务
deployTaskSafe(executionVertexId);
} else {
handleTaskDeploymentFailure(executionVertexId, throwable);
}
return null;
};
}
org.apache.flink.runtime.scheduler.DefaultScheduler#deployTaskSafe
private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
try {
final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
// TODO 部署任务
executionVertexOperations.deploy(executionVertex);
} catch (Throwable e) {
handleTaskDeploymentFailure(executionVertexId, e);
}
}
org.apache.flink.runtime.scheduler.ExecutionVertexOperations#deploy的实现方法:
org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations#deploy
@Override
public void deploy(final ExecutionVertex executionVertex) throws JobException {
// TODO 部署job
executionVertex.deploy();
}
org.apache.flink.runtime.executiongraph.ExecutionVertex#deploy
public void deploy() throws JobException {
// TODO 部署当前job
currentExecution.deploy();
}
org.apache.flink.runtime.executiongraph.Execution#deploy
public void deploy() throws JobException {
// ...
try {
// TODO 向TM提交job
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
if (failure == null) {
vertex.notifyCompletedDeployment(this);
} else {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
}
catch (Throwable t) {
markFailed(t);
if (isLegacyScheduling()) {
ExceptionUtils.rethrow(t);
}
}
}
2.5 TM中执行任务
通过RPC的方式调用TM中的方法,在TM中执行任务。
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway#submitTask的实现方法:
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway#submitTask
@Override
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
}
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway#submitTask的实现方法:
org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
try {
// ...
// TODO 建立Task
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);
log.info("Received task {} ({}), deploy into slot with allocation id {}.",
task.getTaskInfo().getTaskNameWithSubtasks(), tdd.getExecutionAttemptId(), tdd.getAllocationId());
boolean taskAdded;
try {
// TODO 将任务分配到指的的slot中,根据allocation id分配
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
// TODO 分配成功,则启动任务线程,执行任务
if (taskAdded) {
task.startTaskThread();
setupResultPartitionBookkeeping(
tdd.getJobId(),
tdd.getProducedPartitions(),
task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
到这里,TM就正式开始执行我们提交的任务了!
Flink per-job on yarn模式下,整个任务提交的流程就为您介绍到这里。后续还会继续填充其中的一些细节,感谢您的关注!!
最后
以上就是朴实花生为你收集整理的Flink源码篇 No.10-任务提交之调度并执行Task(per-job on yarn)第1章 简介第2章 具体步骤的全部内容,希望文章能够帮你解决Flink源码篇 No.10-任务提交之调度并执行Task(per-job on yarn)第1章 简介第2章 具体步骤所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复