概述
前言
在Hadoop的HDFS启动的时候,不知道大家有没有注意到一个细节,一般都是先启动NameNode,然后再启动DataNode,细想一下,原因就很简单了,因为NameNode要维护元数据信息,而这些信息都是要等待后续启动的DataNode的情况汇报才能逐步构建的.然后之后通过保持心跳的形式进行block块映射关系的维护与更新.而今天的文章就以此方面,对这块流程做全面的分析.
相关涉及类
依旧需要介绍一下相关的涉及类,首先要有一个大概的了解.下面是主要的类:
1.DataNode--数据节点类,这个和之前的数据节点描述符类又又有点不同,里面也定义了许多与数据节点相关的方法.
2.NaemNode--名字节点类,注册信息的处理以及心跳包的处理都需要名字节点处理,名字节点的处理方法会调用FSNamesystem大系统中的方法.
3.DatanodeCommand以及BlockCommand--数据节点命令类以及他的子类,block相关命令类,此类用于名字节点心跳回复命令给数据节点时用的.
4.FSNamesystem和DatanodeDescriptor--附属类,这些类中的某些方法会在上述过程中被用到.
OK,涉及的类的总数也不多,下面讲述第一个流程,节点注册,数据节点是如何在启动的时候注册到名字节点的呢.
节点注册
节点的注册是在数据节点启动之后发生的,首先进入main主方法
- public static void main(String args[]) {
- secureMain(args, null);
- }
- public static void secureMain(String [] args, SecureResources resources) {
- try {
- StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
- DataNode datanode = createDataNode(args, null, resources);
- if (datanode != null)
- datanode.join();
- } catch (Throwable e) {
- LOG.error(StringUtils.stringifyException(e));
- System.exit(-1);
- } finally {
- // We need to add System.exit here because either shutdown was called or
- // some disk related conditions like volumes tolerated or volumes required
- // condition was not met. Also, In secure mode, control will go to Jsvc and
- // the process hangs without System.exit.
- LOG.info("Exiting Datanode");
- System.exit(0);
- }
- }
- /** Start a single datanode daemon and wait for it to finish.
- * If this thread is specifically interrupted, it will stop waiting.
- * 数据节点启动的核心方法
- */
- public static void runDatanodeDaemon(DataNode dn) throws IOException {
- if (dn != null) {
- //register datanode
- //首先注册节点
- dn.register();
- //后续开启相应线程
- dn.dataNodeThread = new Thread(dn, dnThreadName);
- dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
- dn.dataNodeThread.start();
- }
- }
- /**
- * Register datanode
- * <p>
- * The datanode needs to register with the namenode on startup in order
- * 1) to report which storage it is serving now and
- * 2) to receive a registrationID
- * issued by the namenode to recognize registered datanodes.
- *
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
- * @throws IOException
- * 数据节点的注册方法,会调用到namenode上的注册方法
- */
- private void register() throws IOException {
- if (dnRegistration.getStorageID().equals("")) {
- setNewStorageID(dnRegistration);
- }
- while(shouldRun) {
- try {
- // reset name to machineName. Mainly for web interface.
- dnRegistration.name = machineName + ":" + dnRegistration.getPort();
- //调用namenode上的注册方法
- dnRegistration = namenode.register(dnRegistration);
- break;
- ....
- // DatanodeProtocol
- /**
- * 名字节点的注册方法,调用的是FSNameSystem方法
- */
- public DatanodeRegistration register(DatanodeRegistration nodeReg
- ) throws IOException {
- //首先做版本验证
- verifyVersion(nodeReg.getVersion());
- //调用namesystem的注册节点方法
- namesystem.registerDatanode(nodeReg);
- return nodeReg;
- }
名字节点调用的又是命名系统的方法,对注册节点的判断分为以下3种情况
1、现有的节点进行新的存储ID注册
2、现有节点的重复注册,由于集群已经保存有此信息,进行网络位置的更新即可
3、从未注册过的节点,直接进行分配新的存储ID进行注册。
具体方法判断如下,方法比较长
- /**
- * Register Datanode.
- * <p>
- * The purpose of registration is to identify whether the new datanode
- * serves a new data storage, and will report new data block copies,
- * which the namenode was not aware of; or the datanode is a replacement
- * node for the data storage that was previously served by a different
- * or the same (in terms of host:port) datanode.
- * The data storages are distinguished by their storageIDs. When a new
- * data storage is reported the namenode issues a new unique storageID.
- * <p>
- * Finally, the namenode returns its namespaceID as the registrationID
- * for the datanodes.
- * namespaceID is a persistent attribute of the name space.
- * The registrationID is checked every time the datanode is communicating
- * with the namenode.
- * Datanodes with inappropriate registrationID are rejected.
- * If the namenode stops, and then restarts it can restore its
- * namespaceID and will continue serving the datanodes that has previously
- * registered with the namenode without restarting the whole cluster.
- *
- * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
- * 名字节点实现数据节点的注册操作
- */
- public synchronized void registerDatanode(DatanodeRegistration nodeReg
- ) throws IOException {
- String dnAddress = Server.getRemoteAddress();
- if (dnAddress == null) {
- // Mostly called inside an RPC.
- // But if not, use address passed by the data-node.
- dnAddress = nodeReg.getHost();
- }
- // check if the datanode is allowed to be connect to the namenode
- if (!verifyNodeRegistration(nodeReg, dnAddress)) {
- throw new DisallowedDatanodeException(nodeReg);
- }
- String hostName = nodeReg.getHost();
- // update the datanode's name with ip:port
- DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
- nodeReg.getStorageID(),
- nodeReg.getInfoPort(),
- nodeReg.getIpcPort());
- nodeReg.updateRegInfo(dnReg);
- nodeReg.exportedKeys = getBlockKeys();
- NameNode.stateChangeLog.info(
- "BLOCK* NameSystem.registerDatanode: "
- + "node registration from " + nodeReg.getName()
- + " storage " + nodeReg.getStorageID());
- //取出主机相关信息
- DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
- //判断此节点之前是否已经存在
- if (nodeN != null && nodeN != nodeS) {
- //此情况为数据节点存在,但是使用了新的存储ID
- NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
- + "node from name: " + nodeN.getName());
- // nodeN previously served a different data storage,
- // which is not served by anybody anymore.
- //移动掉旧的datanodeID信息
- removeDatanode(nodeN);
- // physically remove node from datanodeMap
- //从物理层面的记录进行移除
- wipeDatanode(nodeN);
- nodeN = null;
- }
- //重复注册的情况
- if (nodeS != null) {
- if (nodeN == nodeS) {
- // The same datanode has been just restarted to serve the same data
- // storage. We do not need to remove old data blocks, the delta will
- // be calculated on the next block report from the datanode
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
- + "node restarted.");
- } else {
- // nodeS is found
- /* The registering datanode is a replacement node for the existing
- data storage, which from now on will be served by a new node.
- If this message repeats, both nodes might have same storageID
- by (insanely rare) random chance. User needs to restart one of the
- nodes with its data cleared (or user can just remove the StorageID
- value in "VERSION" file under the data directory of the datanode,
- but this is might not work if VERSION file format has changed
- */
- NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
- + "node " + nodeS.getName()
- + " is replaced by " + nodeReg.getName() +
- " with the same storageID " +
- nodeReg.getStorageID());
- }
- // update cluster map
- //更新集群的网络信息
- clusterMap.remove(nodeS);
- nodeS.updateRegInfo(nodeReg);
- nodeS.setHostName(hostName);
- // resolve network location
- resolveNetworkLocation(nodeS);
- clusterMap.add(nodeS);
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- if( !heartbeats.contains(nodeS)) {
- heartbeats.add(nodeS);
- //update its timestamp
- nodeS.updateHeartbeat(0L, 0L, 0L, 0);
- nodeS.isAlive = true;
- }
- }
- return;
- }
- // this is a new datanode serving a new data storage
- //当此时确认为一个新的节点时,为新节点分配存储ID
- if (nodeReg.getStorageID().equals("")) {
- // this data storage has never been registered
- // it is either empty or was created by pre-storageID version of DFS
- nodeReg.storageID = newStorageID();
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.registerDatanode: "
- + "new storageID " + nodeReg.getStorageID() + " assigned.");
- }
- // register new datanode
- //创建新的节点
- DatanodeDescriptor nodeDescr
- = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
- resolveNetworkLocation(nodeDescr);
- unprotectedAddDatanode(nodeDescr);
- clusterMap.add(nodeDescr);
- // also treat the registration message as a heartbeat
- //将注册信息加入到心跳
- synchronized(heartbeats) {
- heartbeats.add(nodeDescr);
- nodeDescr.isAlive = true;
- // no need to update its timestamp
- // because its is done when the descriptor is created
- }
- return;
- }
- /**
- * remove a datanode descriptor
- * @param nodeInfo datanode descriptor
- */
- private void removeDatanode(DatanodeDescriptor nodeInfo) {
- synchronized (heartbeats) {
- if (nodeInfo.isAlive) {
- //更新集群中的统计信息
- updateStats(nodeInfo, false);
- //从心跳列表信息中移除对于此节点的心跳信息
- heartbeats.remove(nodeInfo);
- nodeInfo.isAlive = false;
- }
- }
- //移除第二关系中数据块对于此节点的映射关系
- for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
- removeStoredBlock(it.next(), nodeInfo);
- }
- unprotectedRemoveDatanode(nodeInfo);
- //从集群图中移除此节点信息
- clusterMap.remove(nodeInfo);
- }
心跳机制
心跳机制最简单的由来就是为了证明数据节点还活着,如果一段时间内datanode没有向namenode发送心跳包信息,就会被dead状态。并且datanode从心跳包回复中获取命令信息,然后进行下一步操作,所以从这里可以看出,心跳机制在整个HDFS系统中都有很重要的作用。下面一步步揭开HDFS心跳机制的实现。
首先心跳信息是由数据节点发起的,主动方在Datanode上,就是下面这个方法
- /**
- * Main loop for the DataNode. Runs until shutdown,
- * forever calling remote NameNode functions.
- * datanode在循环中不断向名字节点发送心跳信息
- */
- public void offerService() throws Exception {
- LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
- " Initial delay: " + initialBlockReportDelay + "msec");
- //
- // Now loop for a long time....
- //
- while (shouldRun) {
- try {
- long startTime = now();
- //
- // Every so often, send heartbeat or block-report
- //
- if (startTime - lastHeartbeat > heartBeatInterval) {
- //
- // All heartbeat messages include following info:
- // -- Datanode name
- // -- data transfer port
- // -- Total capacity
- // -- Bytes remaining
- //
- //向名字节点发送此时节点的一些信息,dfs使用量,剩余使用量信息等
- lastHeartbeat = startTime;
- //调用namenode.sendHeartbeat进行心跳信息的发送,返回数据节点的操作命令
- DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
- data.getCapacity(),
- data.getDfsUsed(),
- data.getRemaining(),
- xmitsInProgress.get(),
- getXceiverCount());
- myMetrics.addHeartBeat(now() - startTime);
- //LOG.info("Just sent heartbeat, with name " + localName);
- //进行返回命令的处理,如果没有成功不进行后续block块上报工作
- if (!processCommand(cmds))
- continue;
- }
- /**
- * Data node notify the name node that it is alive
- * Return an array of block-oriented commands for the datanode to execute.
- * This will be either a transfer or a delete operation.
- * 数据节点调用此方法进行心跳信息的发送
- */
- public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
- long capacity,
- long dfsUsed,
- long remaining,
- int xmitsInProgress,
- int xceiverCount) throws IOException {
- //对节点注册信息的确认
- verifyRequest(nodeReg);
- //然后再次调用fsnamesystem的handleHeartbeat方法
- return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
- xceiverCount, xmitsInProgress);
- }
- /**
- * The given node has reported in. This method should:
- * 1) Record the heartbeat, so the datanode isn't timed out
- * 2) Adjust usage stats for future block allocation
- *
- * If a substantial amount of time passed since the last datanode
- * heartbeat then request an immediate block report.
- *
- * @return an array of datanode commands
- * @throws IOException
- * 一个给定的数据节点进行心跳信息的上报,主要做2个操作
- * 1.心跳信息的记录,避免数据节点超时
- * 2.调整新的名字节点中维护的数据块分配情况
- */
- DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
- long capacity, long dfsUsed, long remaining,
- int xceiverCount, int xmitsInProgress) throws IOException {
- DatanodeCommand cmd = null;
- synchronized (heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeinfo = null;
- try {
- nodeinfo = getDatanode(nodeReg);
- } catch(UnregisteredDatanodeException e) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
- }
- // Check if this datanode should actually be shutdown instead.
- if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
- setDatanodeDead(nodeinfo);
- throw new DisallowedDatanodeException(nodeinfo);
- }
- //如果不存在此节点信息,说明此节点还未注册,返回节点注册命令
- if (nodeinfo == null || !nodeinfo.isAlive) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
- }
- updateStats(nodeinfo, false);
- nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
- updateStats(nodeinfo, true);
- /**
- * The given node has reported in. This method should:
- * 1) Record the heartbeat, so the datanode isn't timed out
- * 2) Adjust usage stats for future block allocation
- *
- * If a substantial amount of time passed since the last datanode
- * heartbeat then request an immediate block report.
- *
- * @return an array of datanode commands
- * @throws IOException
- * 一个给定的数据节点进行心跳信息的上报,主要做2个操作
- * 1.心跳信息的记录,避免数据节点超时
- * 2.调整新的名字节点中维护的数据块分配情况
- */
- DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
- long capacity, long dfsUsed, long remaining,
- int xceiverCount, int xmitsInProgress) throws IOException {
- ...........
- //check lease recovery
- //检查租约过期情况
- cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
- if (cmd != null) {
- return new DatanodeCommand[] {cmd};
- }
- //新建命令集合,心跳回复将返回许多的命令
- ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
- //check pending replication
- //获取待复制的副本块命令,命令中包含有副本块列表内容
- cmd = nodeinfo.getReplicationCommand(
- maxReplicationStreams - xmitsInProgress);
- if (cmd != null) {
- cmds.add(cmd);
- }
- //check block invalidation
- //块删除命令
- cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
- if (cmd != null) {
- cmds.add(cmd);
- }
- // check access key update
- if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
- cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
- nodeinfo.needKeyUpdate = false;
- }
- // check for balancer bandwidth update
- if (nodeinfo.getBalancerBandwidth() > 0) {
- cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
- // set back to 0 to indicate that datanode has been sent the new value
- nodeinfo.setBalancerBandwidth(0);
- }
- if (!cmds.isEmpty()) {
- //返回命令组
- return cmds.toArray(new DatanodeCommand[cmds.size()]);
- }
- }
- }
- //与block命令相关的函数
- BlockCommand getReplicationCommand(int maxTransfers) {
- //获取待复制的副本block块列表
- List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
- //将变量保证在BlockCommand中进行返回
- return blocktargetlist == null? null:
- new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
- }
- //DatanodeCommand继承自Writable序列化类,说明命令是被序列化传输的
- public abstract class DatanodeCommand implements Writable {
- static class Register extends DatanodeCommand {
- private Register() {super(DatanodeProtocol.DNA_REGISTER);}
- public void readFields(DataInput in) {}
- public void write(DataOutput out) {}
- }
- static class Finalize extends DatanodeCommand {
- private Finalize() {super(DatanodeProtocol.DNA_FINALIZE);}
- public void readFields(DataInput in) {}
- public void write(DataOutput out) {}
- }
- .....
- //action保存了命令操作类型
- private int action;
- .....
- ///
- // Writable
- ///
- //DatanodeCommand将命令操作写在序列化流中
- public void write(DataOutput out) throws IOException {
- out.writeInt(this.action);
- }
- public void readFields(DataInput in) throws IOException {
- this.action = in.readInt();
- }
- }
- /****************************************************
- * A BlockCommand is an instruction to a datanode
- * regarding some blocks under its control. It tells
- * the DataNode to either invalidate a set of indicated
- * blocks, or to copy a set of indicated blocks to
- * another DataNode.
- *
- ****************************************************/
- public class BlockCommand extends DatanodeCommand {
- Block blocks[];
- DatanodeInfo targets[][];
- public BlockCommand() {}
- /**
- * Create BlockCommand for transferring blocks to another datanode
- * @param blocktargetlist blocks to be transferred
- */
- public BlockCommand(int action, List<BlockTargetPair> blocktargetlist) {
- super(action);
- blocks = new Block[blocktargetlist.size()];
- targets = new DatanodeInfo[blocks.length][];
- for(int i = 0; i < blocks.length; i++) {
- BlockTargetPair p = blocktargetlist.get(i);
- blocks[i] = p.block;
- targets[i] = p.targets;
- }
- }
- private static final DatanodeInfo[][] EMPTY_TARGET = {};
- /**
- * Create BlockCommand for the given action
- * @param blocks blocks related to the action
- */
- public BlockCommand(int action, Block blocks[]) {
- super(action);
- this.blocks = blocks;
- this.targets = EMPTY_TARGET;
- }
- ......
- //重载序列化写入方法
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeInt(blocks.length);
- //将block块依次序列化写入
- for (int i = 0; i < blocks.length; i++) {
- blocks[i].write(out);
- }
- out.writeInt(targets.length);
- for (int i = 0; i < targets.length; i++) {
- out.writeInt(targets[i].length);
- for (int j = 0; j < targets[i].length; j++) {
- targets[i][j].write(out);
- }
- }
- }
- /**
- * Process an array of datanode commands
- *
- * @param cmds an array of datanode commands
- * @return true if further processing may be required or false otherwise.
- * 数据节点批量执行操作
- */
- private boolean processCommand(DatanodeCommand[] cmds) {
- if (cmds != null) {
- for (DatanodeCommand cmd : cmds) {
- try {
- //在命令组中,只要有一条命令执行出错,整个执行过程就算失败
- if (processCommand(cmd) == false) {
- return false;
- }
- } catch (IOException ioe) {
- LOG.warn("Error processing datanode Command", ioe);
- }
- }
- }
- return true;
- }
- /**
- *
- * @param cmd
- * @return true if further processing may be required or false otherwise.
- * @throws IOException
- * 调用单条命令处理方法
- */
- private boolean processCommand(DatanodeCommand cmd) throws IOException {
- if (cmd == null)
- return true;
- final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
- //取出命令的action值类型,进行分别判断处理
- switch(cmd.getAction()) {
- case DatanodeProtocol.DNA_TRANSFER:
- // Send a copy of a block to another datanode
- ....
- break;
- case DatanodeProtocol.DNA_INVALIDATE:
- //如果是无效块,则进行blockScanner类扫描删除操作
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- Block toDelete[] = bcmd.getBlocks();
- try {
- if (blockScanner != null) {
- blockScanner.deleteBlocks(toDelete);
- }
- data.invalidate(toDelete);
- } catch(IOException e) {
- checkDiskError();
- throw e;
- }
- myMetrics.incrBlocksRemoved(toDelete.length);
- break;
- case DatanodeProtocol.DNA_SHUTDOWN:
- ....
- case DatanodeProtocol.DNA_REGISTER:
- //如果是注册命令,则调用注册操作
- .....
- break;
- case DatanodeProtocol.DNA_FINALIZE:
- storage.finalizeUpgrade();
- break;
- case UpgradeCommand.UC_ACTION_START_UPGRADE:
- // start distributed upgrade here
- processDistributedUpgradeCommand((UpgradeCommand)cmd);
- break;
- case DatanodeProtocol.DNA_RECOVERBLOCK:
- recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
- break;
- case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
- LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
- if (isBlockTokenEnabled) {
- blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
- }
- break;
- case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
- .....
- default:
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
- }
- return true;
- }
对每种命令,取出action值类别,进行分别处理。OK,到这里就处理完了前半部分代码所做的事情了,在后面会进行数据块上报的工作,准确的说,这方面的操作就不算是心跳机制里面的过程了,但是都是在一个大循环中进行的。
- //检测新接收到的block
- // check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints) {
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
- }
- }
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- //将接收到的新block信息上报
- namenode.blockReceived(dnRegistration, blockArray, delHintArray);
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
- }
- }
- }
- /**
- * Stores a set of DatanodeDescriptor objects.
- * This is a subset of {@link #datanodeMap}, containing nodes that are
- * considered alive.
- * The {@link HeartbeatMonitor} periodically checks for outdated entries,
- * and removes them from the list.
- * 是datanodeMap的子集,保存了节点中存活的节点集合,在HeartbeatMonitor中会被使用
- */
- ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
- /**
- * Periodically calls heartbeatCheck() and updateAccessKey()
- * 心跳监控线程
- */
- class HeartbeatMonitor implements Runnable {
- //上次心跳的检测时间
- private long lastHeartbeatCheck;
- private long lastAccessKeyUpdate;
- /**
- */
- public void run() {
- while (fsRunning) {
- try {
- long now = now();
- if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
- //如果在间隔时间内,做心跳检测
- heartbeatCheck();
- lastHeartbeatCheck = now;
- }
- if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
- updateAccessKey();
- lastAccessKeyUpdate = now;
- }
- } catch (Exception e) {
- FSNamesystem.LOG.err
- /**
- * Check if there are any expired heartbeats, and if so,
- * whether any blocks have to be re-replicated.
- * While removing dead datanodes, make sure that only one datanode is marked
- * dead at a time within the synchronized section. Otherwise, a cascading
- * effect causes more datanodes to be declared dead.
- * 检测是否有任何过期的心跳,移除dead状态的节点
- */
- void heartbeatCheck() {
- //安全模式下不做任何的检查
- if (isInSafeMode()) {
- // not to check dead nodes if in safemode
- return;
- }
- boolean allAlive = false;
- while (!allAlive) {
- boolean foundDead = false;
- DatanodeID nodeID = null;
- // locate the first dead node.
- //定位寻找第一个dead故障状态的节点
- synchronized(heartbeats) {
- for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
- it.hasNext();) {
- DatanodeDescriptor nodeInfo = it.next();
- if (isDatanodeDead(nodeInfo)) {
- //一旦找到,并取出对应消息并跳出循环
- foundDead = true;
- nodeID = nodeInfo;
- break;
- }
- }
- }
- // acquire the fsnamesystem lock, and then remove the dead node.
- if (foundDead) {
- //为了确保同步性,进行加锁操作,进行dead故障状态节点的处理操作
- synchronized (this) {
- synchronized(heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeInfo = null;
- try {
- nodeInfo = getDatanode(nodeID);
- } catch (IOException e) {
- nodeInfo = null;
- }
- if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
- + "lost heartbeat from " + nodeInfo.getName());
- //移除此节点的信息,此方法会调用headtbeat的移除nodeinfo操作
- removeDatanode(nodeInfo);
- }
- }
- }
- }
- }
- //重置标志位
- allAlive = !foundDead;
- }
全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。
参考文献
《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等
最后
以上就是甜蜜蛋挞为你收集整理的HDFS源码分析(五)-----节点注册与心跳机制 前言 相关涉及类 节点注册 心跳机制 参考文献的全部内容,希望文章能够帮你解决HDFS源码分析(五)-----节点注册与心跳机制 前言 相关涉及类 节点注册 心跳机制 参考文献所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复