概述
DataNode在启动之后会周期性地向NameNode发送心跳,同时DataNode也会收到来自NameNode的响应,响应内部包含了NameNode下发给DataNode的一些指令,那么内部是如何实现的呢?
一、源码剖析
实际上HDFS的心跳是通过BPServiceActor线程实现的,在BPServiceActor类中,实现了Runnable接口,run方法内部定义了与NameNode的注册和心跳的代码。其中offerService()就是DataNode心跳机制的核心方法。
public void run() {
//...
//在while循环内部
while (shouldRun()) {
try {
//DataNode心跳机制的核心方法
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
}
offerService()具体实现
/**
* Main loop for each BP thread. Run until shutdown,
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
//
// Now loop for a long time....
//
while (shouldRun()) {
try {
final long startTime = monotonicNow();
//心跳默认是每3秒进行一次
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
//发送心跳,响应是HeartbeatResponse,即NameNode给DataNode发的指令
HeartbeatResponse resp = sendHeartBeat();
//...
//TODO 获得一些namenode发送过来的指令
if (!processCommand(resp.getCommands()))
continue;
}
}
// ...
} // offerService
sendHeartBeat()具体实现
HeartbeatResponse sendHeartBeat() throws IOException {
//StorageReport包含DatanodeStorage storage、boolean failed、long capacity、dfsUsed、remaining、blockPoolUsed
//reports实际就是关于DataNode数据存储方面的一些信息
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
//卷失败的详情,failedStorageLocations、lastVolumeFailureDate、estimatedCapacityLostTotal
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
//需要关注的核心代码
//获取namenode代理对象bpNamenode,调用发送心跳的方法
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
}
bpNamenode获取的是NameNode的RPC代理对象,对应的类实际上是NameNodeRpcServer,查看NameNodeRpcServer的sendHeartbeat方法
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
throws IOException {
checkNNStartup(); //检查NameNode是否启动
verifyRequest(nodeReg); //请求参数的校验
//TODO 处理DataNode发送的心跳信息
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary);
}
方法内进行了一些必要的检查,包括NameNode是否启动,对请求参数的校验,最后调用了核心方法handleHeartbeat()
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
readLock();
try {
//get datanode commands
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
//TODO 获取FSNamesystem类中获取blockManager对象,通过DatanodeManager对DataNode发送的心跳信息进行处理,最后返回指令cmds
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getLastAppliedOrWrittenTxId());
//TODO 返回HeartbeatResponse给DataNode
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
} finally {
readUnlock();
}
}
继续深入handleHeartbeat(),查看NameNode处理心跳的细节
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
//...
// TODO 通过datanode发送过来的datanodeUuid,namenode会从datanodeMap中获取发送注册的DataNode信息
// 如果能通过uuid获取到datanode的信息,说明该datanode已经注册,反之获取不到,说明datanode没有注册
// 现在是发送心跳,正常情况下是能拿到datanode信息的
nodeinfo = getDatanode(nodeReg);
// ...
//TODO heartbeatManager更新心跳相关的信息,包括存储相关的信息,包括心跳时间等
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
}
在NameNode启动的时候,实际上NameNode会启动并初始化HttpServer、RPC Server、加载元数据、启动公共服务(包括资源检查、安全模式检查等,其中还包括了心跳处理)
在FSNamesystem中,有个叫startCommonServices()的方法,这个在NameNode启动流程那篇文件有讲过,现在我们来深入分析一下
/**
* Start services common to both active and standby states
*/
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
//TODO (1)跟踪NameNodeResourceChecker构造方法
// 创建一个资源检查器
nnResourceChecker = new NameNodeResourceChecker(conf);
//TODO (2) 深入checkAvailableResources()方法
//检查是否有足够的磁盘存储元数据, FSImage + editlog,要有足够的空间存储这两个文件
assert safeMode != null && !isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
getCompleteBlocksTotal());
// TODO (3) 深入HDFS的安全模式
setBlockTotal();
// TODO (4) 启动了心跳管理机制
blockManager.activate(conf);
} finally {
writeUnlock();
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
}
上面代码 (4)启动了心跳管理机制,深入activate方法
public void activate(Configuration conf) {
//启动等待复制的线程
pendingReplications.start();
//启动了心跳管理的服务
datanodeManager.activate(conf);
this.replicationThread.start();
}
继续深入datanodeManager.activate(conf),发现heartbeatThread调用了线程的start方法,
void activate(Configuration conf) {
heartbeatThread.start();
}
既然调用了线程的start方法,那么应该关注HeartbeatManager的run方法,他是逻辑的具体实现
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
@Override
public void run() {
while(namesystem.isRunning()) {
try {
final long now = Time.monotonicNow();
// 在while循环内,如果上次心跳检查时间 + 心跳周期 < 当前时间,则执行一次心跳检查
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
//TODO 心跳检查方法
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
}
发现HeartbeatManager内部包含了一个类叫:Monitor,它实现了Runnable接口,在while循环内部,周期性的进行心跳检查,具体实现是通过调用heartbeatCheck方法
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
// ...
synchronized(this) {
// 遍历List<DatanodeDescriptor> datanodes中的每个datanode
//data在注册的时候,namenode就是将datanode的注册信息放到了HeartbeatManager中的datanodes数据结构中
// 后续在datanode发起心跳的时候,namenode检查/修改的就是这个数据结构,并对datanode的心跳信息做出一些更新
for (DatanodeDescriptor d : datanodes) {
//遍历数据结构中的每一个datanode,如果超时没有发送心跳,则说明该datanode dead
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
}
if (d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
for(DatanodeStorageInfo storageInfo : storageInfos) {
if (storageInfo.areBlockContentsStale()) {
numOfStaleStorages++;
}
if (failedStorage == null &&
storageInfo.areBlocksOnFailedStorage() &&
d != dead) {
failedStorage = storageInfo;
}
}
}
// Set the number of stale nodes in the DatanodeManager
dm.setNumStaleNodes(numOfStaleNodes);
dm.setNumStaleStorages(numOfStaleStorages);
}
}
判断datanode挂掉的依据,当namenode在330s内没有收到datanode的心跳信息才认为datanode挂掉
/** Is the datanode dead? */
boolean isDatanodeDead(DatanodeDescriptor node) {
//monotonicNow当前时间
//heartbeatExpireInterval心跳过期时间间隔
//this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds;
//heartbeatRecheckInterval默认值5*60*1000
// 所以heartbeatExpireInterval的时长是330s,10分30秒
return (node.getLastUpdateMonotonic() <
(monotonicNow() - heartbeatExpireInterval));
}
二、流程总结
结合DataNode的注册流程,我们继续完善DataNode在启动之后的关键步骤
最后
以上就是美好麦片为你收集整理的HDFS核心源码解析(三)——DataNode的心跳机制一、源码剖析二、流程总结的全部内容,希望文章能够帮你解决HDFS核心源码解析(三)——DataNode的心跳机制一、源码剖析二、流程总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复