概述
心跳实际上是一个RPC函数。
TaskTracker周期性调用该函数汇报节点和任务状态信息,形成心跳。
三个作用:
- 判断TaskTracker是否活着
- 及时让JobTracker获取各个节点上的资源使用情况和任务运行状态。
- 为TaskTracker分配任务。
TaskTracker周期性调用RPC函数heartbeat向JobTracker汇报信息和领取任务
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status , boolean restarted, boolean initialContact,boolean acceptNewTasks,short responseId)
/*
status:封装了TaskTracker上的状态信息
Restarted:标识TaskTracker是否重新启动
initialContact:标识TaskTracker是否初次连接
acceptNewTasks:标识TaskTracker是否可以接收新任务,这通常取决于slot是否具有剩余和节点健康情况等。
responseId:表示心跳响应编号,防止重复发送心跳。每接收一次心跳,该值加1
*/
该函数返回一个HeartbeatResponse对象,封装了JobTracker向TaskTracker下达的命令。
class HeartbeatResponse implements Writable , Configurable{
...
short responseId;//心跳响应编号
int heartbeatInterval;//下次心跳的发送间隔
TaskTrackerAction[] actions;//来自JobTracker的命令
Set<JobID> recoveredJobs = new Hashset<JobID>();//恢复完成的作业列表
函数内部实现逻辑主要两步骤:更新状态和下达命令。JobTracker首先将TaskTracker汇报的最新任务运行状态保存到相应数据结构中,然后根据这些状态信息和外界需求为其下达相应的命令。
更新状态:
heartbeat函数首先更新TaskTracker/Job/Task的状态信息
下达命令:
更新完状态后,JobTracker为TaskTracker构造一个HeartbeatResponse对象作为心跳应答, 该对象主要两部分内容:下达给TaskTracker的命令和下次汇报心跳的时间间隔。
下达命令:
- ReinitTrackerAction:一致性检查用的,如果出现以下情况:①丢失上次心跳应答信息:JobTracker会保存向每个TaskTracker 发送的最近心跳应答信息,如果这个最近心跳丢失,就出现不一致。②丢失TaskTracker状态信息:JobTracker更新TaskTracker状态会保存起来,如果TaskTracker不是第一次连接JobTracker但状态信息不存在,也是不一致状态。
出现不一致情况,就会发送这条命令- LaunchTaskAction:该类封装了TaskTracker新分配的任务。TaskTracker收到该命令后会启动一个子进程运行该任务。
- KillTaskAction:该类封装了TaskTracker需杀死的任务。导致发送该命令的原因:①用户主动杀死②启用推测执行机制后,同一份数据可能同时两个Task Attempt处理,其中一个执行完了,另一个就要kill③某个作业任务失败,所有任务kill⑥TaskTracker一段时间内没有汇报心跳,JobTracker认为它死掉
- KillJobAction:该类封装了TaskTracker待清理的作业,TaskTracker接收该命令后,清理作业的临时目录。
- CommitTaskAction:封装了TaskTracker需提交的任务。
调整心跳间隔:
心跳间隔太小:JobTracker处理高并发的心跳连接请求,产生过大的并发压力
心跳间隔太大:出现空闲资源时不能及时汇报给JobTracker,造成资源空闲, 降低系统吞吐率
TaksTracker汇报心跳的时间间隔是动态的,根据集群规模调整,在Hadoop中,只有JobTracker直到集群规模,所以JobTracker为每个TaskTacker计算下一次汇报心跳的时间间隔,并通过心跳机制告诉TaskTracker。
JobTracker允许用户使用参数配置心跳的时间间隔加速比,每增加mapred.heartbeats.in.second个节点(默认是100),心跳时间间隔就增加mapreduce.jobtarcker.heartbeat.scaling.factor秒(默认1)。JobTracker要求心跳时间间隔至少3秒。
public int getNextHeartbeatInterval(){
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatIntercal = Math.max{
(int)(1000 * HEARTBEATS_SCALING_FACTOR*Math.ceil((double)clusterSize / NUM_HEARTBEATS_IN_SECOND)),HEARTBEAT_INTERVAL_MIN);
return heartbeatInterval;
}
最后
以上就是繁荣月亮为你收集整理的JobTracker内部原理——心跳接收与应答的全部内容,希望文章能够帮你解决JobTracker内部原理——心跳接收与应答所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复