我是靠谱客的博主 还单身西装,这篇文章主要介绍HDFS源码解析之DataNode启动流程与心跳机制(四)DataNode 启动流程,现在分享给大家,希望可以做个参考。

DataNode 启动流程

1. 查看DataNode类的注释

1.1 DataNode类注释

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/********************************************************** * DataNode is a class (and program) that stores a set of * blocks for a DFS deployment. A single deployment can * have one or many DataNodes. Each DataNode communicates * regularly with a single NameNode. It also communicates * with client code and other DataNodes from time to time. * * TODO (1) * DataNode存储hdfs上的block文件块,在一个文件系统里面可以有有多个datanode * 每个datanode周期性的跟namenode进行通信,客户端也可以跟datanode进行交互 * 同时datanode之间也可以进行相互通信 * * DataNodes store a series of named blocks. The DataNode * allows client code to read these blocks, or to write new * block data. The DataNode may also, in response to instructions * from its NameNode, delete blocks or copy blocks to/from other * DataNodes. * * TODO (2) * DataNode 存储一些列block datanode允许客户端去读写block * DataNode也会去 响应 NameNode发送过来的一些指令 * 比如:删除block,复制block等操作 * * namenode * datanode->namenode * 在hdfs里面namenode是不会直接去操作datanode,但是一些指令会携带在心跳中来进行控制datanode * eg: * 我们有一个block01 这个块有三个副本,假如hadoop03节点宕机了,datanode是不能感知的,但是namenode可以根据心跳来感知 * 那么这个时候我们需要保持三个副本,那么这个时候namenode就会把同步副本的指令携带在心跳当中来进行发起 * * The DataNode maintains just one critical table: * block-> stream of bytes (of BLOCK_SIZE or less) * TODO (3) * datanode管理了一个重要的表: * block-> stream of bytes 看起来像是一些元数据的信息 * * * This info is stored on a local disk. The DataNode * reports the table's contents to the NameNode upon startup * and every so often afterwards. * TODO (4) * 这个信息是存储在本地磁盘,DataNode启动的时候会把这些信息 * 汇报给NameNode,启动了之后也会再去不断的汇报. * * * DataNodes spend their lives in an endless loop of asking * the NameNode for something to do. A NameNode cannot connect * to a DataNode directly; a NameNode simply returns values from * functions invoked by a DataNode. * * TODO (5) * DataNode启动了以后会一直去问NameNode自己需要干些什么?心跳 * NameNode是不能直接去操作DataNode的,DataNode启动了以后,会跟NameNode进行心跳, * NameNode接受到了心跳以后,如果需要这个DataNode做什么事情, * 就会给DataNode一个返回值(指令),DataNode接受到这些指令以后就知道NamNode想让他做什么事了 * * * DataNodes maintain an open server socket so that client code * or other DataNodes can read/write data. The host/port for * this server is reported to the NameNode, which then sends that * information to clients or other DataNodes that might be interested. * TODO (6) * DataNode开放了Socket服务,让客户端或者别的DataNode来进行读写数据 * DataNode启动的时候就会把自己的主机名和端口号汇报给NamNode * 也就是说如果Client和DataNode想要去访问某个DataNode,首先要跟NameNode进行通信 * 从NameNode那获取到目标DataNode的主机名和端口号. * 这样才可以访问到对应的DataNode了 * * TODO 总结: * 1. 一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据的. * 2. DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报) * 3. NameNode不能直接操作DataNode,而是通信心跳返回值指令的方式去操作DataNode * 4. DataNode启动以后开放一个Socket的服务(RPC),等待被人去调用它. * **********************************************************/

2. DataNode初始化

2.1 main

复制代码
1
2
3
4
5
6
7
8
public static void main(String args[]) { if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) { System.exit(0); } secureMain(args, null); }

2.2 secureMain

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { StringUtils.startupShutdownMessage(DataNode.class, args, LOG); //TODO 初始化DataNode DataNode datanode = createDataNode(args, null, resources); if (datanode != null) { //TODO 阻塞起来,也就是为什么JPS可以看到DataNode datanode.join(); } else { errorCode = 1; } } catch (Throwable e) { LOG.fatal("Exception in secureMain", e); terminate(1, e); } finally { // We need to terminate the process here because either shutdown was called // or some disk related conditions like volumes tolerated or volumes required // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); terminate(errorCode); } }

2.3 createDataNode

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** Instantiate & Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */ @VisibleForTesting @InterfaceAudience.Private public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { //TODO 实例化DataNode DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) { //启动后台线程 dn.runDatanodeDaemon(); } return dn; }

2.4 instantiateDataNode

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/** Instantiate a single datanode object, along with its secure resources. * This must be run by invoking{@link DataNode#runDatanodeDaemon()} * subsequently. */ public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new HdfsConfiguration(); if (args != null) { // parse generic hadoop options GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } if (!parseArguments(args, conf)) { printUsage(System.err); return null; } Collection<StorageLocation> dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_KERBEROS_PRINCIPAL_KEY); //TODO 重要的代码 return makeInstance(dataLocations, conf, resources); }

2.5 makeInstance

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/** * Make an instance of DataNode after ensuring that at least one of the * given data directories (and their parent directories, if necessary) * can be created. * @param dataDirs List of directories, where the new DataNode instance should * keep its files. * @param conf Configuration instance to use. * @param resources Secure resources needed to run under Kerberos * @return DataNode instance for given list of data dirs and conf, or null if * no directory from this directory list can be created. * @throws IOException */ static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException { LocalFileSystem localFS = FileSystem.getLocal(conf); FsPermission permission = new FsPermission( conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); // 磁盘检查 DataNodeDiskChecker dataNodeDiskChecker = new DataNodeDiskChecker(permission); List<StorageLocation> locations = checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker); DefaultMetricsSystem.initialize("DataNode"); assert locations.size() > 0 : "number of data directories should be > 0"; //TODO 创建我们的DataNode return new DataNode(conf, locations, resources); }

2.6 new DataNode

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/** * Create the DataNode given a configuration, an array of dataDirs, * and a namenode proxy */ DataNode(final Configuration conf, final List<StorageLocation> dataDirs, final SecureResources resources) throws IOException { super(conf); ... try { hostName = getHostName(conf); LOG.info("Configured hostname is " + hostName); //TODO 启动DataNode startDataNode(conf, dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; } final int dncCacheMaxSize = conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY, DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ; datanodeNetworkCounts = CacheBuilder.newBuilder() .maximumSize(dncCacheMaxSize) .build(new CacheLoader<String, Map<String, Long>>() { @Override public Map<String, Long> load(String key) throws Exception { final Map<String, Long> ret = new HashMap<String, Long>(); ret.put("networkErrors", 0L); return ret; } }); }

2.7 startDataNode

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/** * This method starts the data node with the specified conf. * * @param conf - the configuration * if conf's CONFIG_PROPERTY_SIMULATED property is set * then a simulated storage based data node is created. * * @param dataDirs - only for a non-simulated storage data node * @throws IOException */ void startDataNode(Configuration conf, List<StorageLocation> dataDirs, SecureResources resources ) throws IOException { // settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) { this.dataDirs = dataDirs; } this.conf = conf; this.dnConf = new DNConf(conf); checkSecureConfig(dnConf, conf, resources); this.spanReceiverHost = SpanReceiverHost.getInstance(conf); if (dnConf.maxLockedMemory > 0) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) is greater than zero and native code is not available.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY)); } if (Path.WINDOWS) { NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory); } else { long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit(); if (dnConf.maxLockedMemory > ulimit) { throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) of %d bytes is more than the datanode's available" + " RLIMIT_MEMLOCK ulimit of %d bytes.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, dnConf.maxLockedMemory, ulimit)); } } } LOG.info("Starting DataNode with maxLockedMemory = " + dnConf.maxLockedMemory); // TODO 存储对象 storage = new DataStorage(); // global DN settings registerMXBean(); //TODO 初始化DataXceiver initDataXceiver(conf); //TODO 初始化HttpServer服务 startInfoServer(conf); pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); // BlockPoolTokenSecretManager is required to create ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); // Login is done by now. Set the DN user name. dnUserName = UserGroupInformation.getCurrentUser().getShortUserName(); LOG.info("dnUserName = " + dnUserName); LOG.info("supergroup = " + supergroup); initIpcServer(conf); metrics = DataNodeMetrics.create(conf, getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf); // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); }

2.8 initDataXceiver

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void initDataXceiver(Configuration conf) throws IOException { // find free port or use privileged port provided TcpPeerServer tcpPeerServer; if (secureResources != null) { tcpPeerServer = new TcpPeerServer(secureResources); } else { tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, DataNode.getStreamingAddr(conf)); } tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); //TODO 实例化了一个DataXceiverServer 这个东西就是DataNode用来接收客户端和其他DataNode传过来的数据服务 xserver = new DataXceiverServer(tcpPeerServer, conf, this); //设置为后台线程 this.dataXceiverServer = new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // auto destroy when empty if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) || conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) { DomainPeerServer domainPeerServer = getDomainPeerServer(conf, streamingAddr.getPort()); if (domainPeerServer != null) { this.localDataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(domainPeerServer, conf, this)); LOG.info("Listening on UNIX domain socket: " + domainPeerServer.getBindPath()); } } this.shortCircuitRegistry = new ShortCircuitRegistry(conf); }

2.9 startInfoServer

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/** * @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration) * for information related to the different configuration options and * Http Policy is decided. */ private void startInfoServer(Configuration conf) throws IOException { Configuration confForInfoServer = new Configuration(conf); confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); //NameNode启动的时候也启动了一个httpserver2 //TODO 用来接收http的请求 HttpServer2.Builder builder = new HttpServer2.Builder() .setName("datanode") .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))) .addEndpoint(URI.create("http://localhost:0")) .setFindPort(true); this.infoServer = builder.build(); //TODO 网这个httpserver上面绑定了多个servlet this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class); this.infoServer.addInternalServlet(null, "/getFileChecksum/*", FileChecksumServlets.GetServlet.class); this.infoServer.setAttribute("datanode", this); this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); this.infoServer.addServlet(null, "/blockScannerReport", BlockScanner.Servlet.class); //启动了httpserver this.infoServer.start(); InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0); // SecureDataNodeStarter will bind the privileged port to the channel if // the DN is started by JSVC, pass it along. ServerSocketChannel httpServerChannel = secureResources != null ? secureResources.getHttpServerChannel() : null; this.httpServer = new DatanodeHttpServer(conf, jettyAddr, httpServerChannel); httpServer.start(); if (httpServer.getHttpAddress() != null) { infoPort = httpServer.getHttpAddress().getPort(); } if (httpServer.getHttpsAddress() != null) { infoSecurePort = httpServer.getHttpsAddress().getPort(); } }

2.10 initIpcServer

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void initIpcServer(Configuration conf) throws IOException { InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY)); // Add all the RPC protocols that the Datanode implements RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, ProtobufRpcEngine.class); ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = new ClientDatanodeProtocolServerSideTranslatorPB(this); BlockingService service = ClientDatanodeProtocolService .newReflectiveBlockingService(clientDatanodeProtocolXlator); ipcServer = new RPC.Builder(conf) .setProtocol(ClientDatanodeProtocolPB.class) .setInstance(service) .setBindAddress(ipcAddr.getHostName()) .setPort(ipcAddr.getPort()) .setNumHandlers( conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false) .setSecretManager(blockPoolTokenSecretManager).build(); InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = new InterDatanodeProtocolServerSideTranslatorPB(this); service = InterDatanodeProtocolService .newReflectiveBlockingService(interDatanodeProtocolXlator); DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service, ipcServer); TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, ipcServer); LOG.info("Opened IPC server at " + ipcServer.getListenerAddress()); // set service-level authorization security policy if (conf.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); } }

2.11 new BlockPoolManager

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
//TODO 创建了BlockPoolMangaer /** * BlockPool,一个集群就有一个BlockPool * 如果我们是联邦机制,就会有多个namenode,也就是会有多个联邦,一个联邦就是一个blockpool * 假设一个集群里面:4个namenode 两个联邦 * 联邦一:hadoop1(Active) Hadoop2(StandBy) (blockPool是同一个) * 联邦二:hadoop2(Active) Hadoop3(StandBy) (blockPool是同一个) */ blockPoolManager = new BlockPoolManager(this);

2.12 refreshNamenodes

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void refreshNamenodes(Configuration conf) throws IOException { LOG.info("Refresh request received for nameservices: " + conf.get (DFSConfigKeys.DFS_NAMESERVICES)); Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil .getNNServiceRpcAddressesForCluster(conf); synchronized (refreshNamenodesLock) { //TODO 重要代码 doRefreshNamenodes(newAddressMap); } }

2.13 refreshNamenodes

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
private void doRefreshNamenodes( Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException { assert Thread.holdsLock(refreshNamenodesLock); Set<String> toRefresh = Sets.newLinkedHashSet(); //联邦id Set<String> toAdd = Sets.newLinkedHashSet(); Set<String> toRemove; synchronized (this) { // Step 1. For each of the new nameservices, figure out whether // it's an update of the set of NNs for an existing NS, // or an entirely new nameservice. //TODO 通常情况下: HDFS集群的架构是HA架构 //nameservice(hadoop1 hadoop2) for (String nameserviceId : addrMap.keySet()) { if (bpByNameserviceId.containsKey(nameserviceId)) { toRefresh.add(nameserviceId); } else { //TODO toAdd里面有多少的联邦,一个联邦就是一个NameService toAdd.add(nameserviceId); } } // Step 2. Any nameservices we currently have but are no longer present // need to be removed. toRemove = Sets.newHashSet(Sets.difference( bpByNameserviceId.keySet(), addrMap.keySet())); assert toRefresh.size() + toAdd.size() == addrMap.size() : "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) + " toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) + " toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh); // Step 3. Start new nameservices if (!toAdd.isEmpty()) { LOG.info("Starting BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toAdd)); //TODO 遍历所有的联邦,一个联邦里面会有两个Namnode(HA) //toAdd正常的集群,里面其实只有一个值 for (String nsToAdd : toAdd) { ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToAdd).values()); //TODO 重要的关系 // 一个联邦对应一个BPOfferService // 一个联邦里面的一个NameNode就是一个BPServiceActor //也就是正常来说一个BPOfferService有两个BPServiceActor BPOfferService bpos = createBPOS(addrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } } //TODO 心跳 startAll(); } // Step 4. Shut down old nameservices. This happens outside // of the synchronized(this) lock since they need to call // back to .remove() from another thread if (!toRemove.isEmpty()) { LOG.info("Stopping BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toRemove)); for (String nsToRemove : toRemove) { BPOfferService bpos = bpByNameserviceId.get(nsToRemove); bpos.stop(); bpos.join(); // they will call remove on their own } } // Step 5. Update nameservices whose NN list has changed if (!toRefresh.isEmpty()) { LOG.info("Refreshing list of NNs for nameservices: " + Joiner.on(",").useForNull("<default>").join(toRefresh)); //遍历所有的 for (String nsToRefresh : toRefresh) { BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToRefresh).values()); bpos.refreshNNList(addrs); } } }

2.14 createBPOS

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
//TODO 重要的关系 // 一个联邦对应一个BPOfferService // 一个联邦里面的一个NameNode就是一个BPServiceActor //也就是正常来说一个BPOfferService有两个BPServiceActor BPOfferService bpos = createBPOS(addrs); /** * Extracted out for test purposes. */ protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) { return new BPOfferService(nnAddrs, dn); }

2.15 new BPOfferService

复制代码
1
2
3
4
5
6
7
8
9
10
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); this.dn = dn; //遍历所有的NamoNode绑定到一个BPServiceActor上 for (InetSocketAddress addr : nnAddrs) { this.bpServices.add(new BPServiceActor(addr, this)); } }

2.16 此时的DataNode初始化流程图

image-20200705094455494

3. DataNode注册机制

3.1 refreshNamenodes的startAll

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { //TODO 遍历所有的BPOfferService 也就是说遍历所有的联邦 for (BPOfferService bpos : offerServices) { // TODO 重要 bpos.start(); } return null; } }); } catch (InterruptedException ex) { IOException ioe = new IOException(); ioe.initCause(ex.getCause()); throw ioe; } }

3.2 bpos.start()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//This must be called only by blockPoolManager void start() { //TODO 一个bpServices 里面就会有多个Actor for (BPServiceActor actor : bpServices) { //TODO DataNode进行注册和心跳 /** * 第一个联邦 hadoop1,hadoop2 * actor1 -> hadoop1 * actor2 -> hadoop2 */ actor.start(); } }

3.3 actor.start();

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//This must be called only by BPOfferService void start() { if ((bpThread != null) && (bpThread.isAlive())) { //Thread is started already return; } bpThread = new Thread(this, formatThreadName()); //设置为后台启动 bpThread.setDaemon(true); // needed for JUnit testing //TODO 启动线程,所以我们接下来观察他的run方法 bpThread.start();//run }

3.4 BPServiceActror的run()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
try { while (true) { // init stuff try { // setup storage //TODO 注册核心代码 //为什么是while true,其实就是我们尽量要保证完成 connectToNNAndHandshake(); break; } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed runningState = RunningState.INIT_FAILED; if (shouldRetryInit()) { // Retry until all namenode's of BPOS failed initialization LOG.error("Initialization failed for " + this + " " + ioe.getLocalizedMessage()); //TODO 如果有问题sleep 5秒 //TODO 再继续尝试 sleepAndLogInterrupts(5000, "initializing"); } else { runningState = RunningState.FAILED; LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe); return; } } } //注册结束了

3.5 connectToNNAndHandshake()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void connectToNNAndHandshake() throws IOException { //TODO 获取到namenode的代理 //RPC的客户端 // get NN proxy bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. //TODO 校验NamespaceInfo的信息 //TODO datanode -> HA() bpos.verifyAndSetNamespaceInfo(nsInfo); // Second phase of the handshake with the NN. //TODO DataNode注册 register(nsInfo); }

3.6 register(nsInfo);

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/** * Register one bp with the corresponding NameNode * <p> * The bpDatanode needs to register with the namenode on startup in order * 1) to report which storage it is serving now and * 2) to receive a registrationID * * issued by the namenode to recognize registered datanodes. * * @param nsInfo current NamespaceInfo * @see FSNamesystem#registerDatanode(DatanodeRegistration) * @throws IOException */ void register(NamespaceInfo nsInfo) throws IOException { // The handshake() phase loaded the block pool storage // off disk - so update the bpRegistration object from that info //TODO 创建注册信息,也就是自己的信息,例如磁盘什么的块啊 bpRegistration = bpos.createRegistration(); LOG.info(this + " beginning handshake with NN"); while (shouldRun()) { try { // Use returned registration from namenode with updated fields //TODO 调用服务端的registerDatanode方法 //bpNamenode 其实应该就是代表了NameNode,其实就是服务端的代理 bpRegistration = bpNamenode.registerDatanode(bpRegistration); //TODO 如果执行到着了,说明注册过程已经完成了 bpRegistration.setNamespaceInfo(nsInfo); break; } catch(EOFException e) { // namenode might have just restarted LOG.info("Problem connecting to server: " + nnAddr + " :" + e.getLocalizedMessage()); sleepAndLogInterrupts(1000, "connecting to server"); } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); sleepAndLogInterrupts(1000, "connecting to server"); } } LOG.info("Block pool " + this + " successfully registered with NN"); bpos.registrationSucceeded(this, bpRegistration); // random short delay - helps scatter the BR from all DNs scheduleBlockReport(dnConf.initialBlockReportDelay); }

3.7 NameNodeRpcServer的registerDatanode

复制代码
1
2
3
4
5
6
7
8
9
10
11
@Override // DatanodeProtocol public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) throws IOException { //是否启动起来 checkNNStartup(); verifySoftwareVersion(nodeReg); //TODO 注册DataNode namesystem.registerDatanode(nodeReg); return nodeReg; }

3.8 namesystem.registerDatanode(nodeReg);

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/** * Register Datanode. * <p> * The purpose of registration is to identify whether the new datanode * serves a new data storage, and will report new data block copies, * which the namenode was not aware of; or the datanode is a replacement * node for the data storage that was previously served by a different * or the same (in terms of host:port) datanode. * The data storages are distinguished by their storageIDs. When a new * data storage is reported the namenode issues a new unique storageID. * <p> * Finally, the namenode returns its namespaceID as the registrationID * for the datanodes. * namespaceID is a persistent attribute of the name space. * The registrationID is checked every time the datanode is communicating * with the namenode. * Datanodes with inappropriate registrationID are rejected. * If the namenode stops, and then restarts it can restore its * namespaceID and will continue serving the datanodes that has previously * registered with the namenode without restarting the whole cluster. * * @see org.apache.hadoop.hdfs.server.datanode.DataNode */ void registerDatanode(DatanodeRegistration nodeReg) throws IOException { writeLock(); try { //TODO DataNodeManager 处理关于DataNode的事情 getBlockManager().getDatanodeManager().registerDatanode(nodeReg); checkSafeMode(); } finally { writeUnlock(); } }

3.9 DatanodeManager的registerDatanode()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
// register new datanode //TODO 注册DataNode addDatanode(nodeDescr); // also treat the registration message as a heartbeat // no need to update its timestamp // because its is done when the descriptor is created //TODO 把注册上来的DataNode加入到HeartbeatManager里面 //后面进行心跳机制 heartbeatManager.addDatanode(nodeDescr);

3.10 addDatanode(nodeDescr);

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** Add a datanode. */ //TODO 注册DataNode说白了就是往一堆数据结构立面添加 void addDatanode(final DatanodeDescriptor node) { // To keep host2DatanodeMap consistent with datanodeMap, // remove from host2DatanodeMap the datanodeDescriptor removed // from datanodeMap before adding node to host2DatanodeMap. synchronized(datanodeMap) { //TODO datanode 是否已经拥有过 host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node)); } //TODO 往拓扑数据结构立面假如一条数据 networktopology.add(node); // may throw InvalidTopologyException //TODO 往内存里面加入一条数据 host2DatanodeMap.add(node); //如果以上内存数据结构立面的数据 checkIfClusterIsNowMultiRack(node); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".addDatanode: " + "node " + node + " is added to datanodeMap."); } }

3.11 NameNode注册流程图

image-20200705103932865

4. DataNode的心跳机制

4.1 heartbeatManager.addDatanode(nodeDescr);

复制代码
1
2
3
4
5
6
7
8
9
synchronized void addDatanode(final DatanodeDescriptor d) { // update in-service node count //往各种数据结构立面存东西 stats.add(d); //往datanodes list结构立面存进去了datanode的信息 datanodes.add(d); d.isAlive = true; }

4.2 BPServiceActor的run()

复制代码
1
2
3
4
5
6
7
8
9
10
while (shouldRun()) { try { //TODO 发送心跳 offerService(); } catch (Exception ex) { LOG.error("Exception in BPOfferService for " + this, ex); sleepAndLogInterrupts(5000, "offering service"); } }

4.3 offerService()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
while (shouldRun()) { try { final long startTime = monotonicNow(); // // Every so often, send heartbeat or block-report // //TODO 心跳是每三秒进行一次 //startTime = 当前时间 //lastHeartbeat 最后一次心跳时间 // dnConf.heartBeatInterval = 3*1000L 也就是三秒 if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // //TODO 重新更新最后一次心跳时间 lastHeartbeat = startTime; if (!dn.areHeartbeatsDisabledForTests()) { /** * NameNode是不直接跟DataNode进行连接的 * DataNode发送心跳给NameNode * NameNode接受到心跳以后,会返回来一些指令 * Datanode接收到这些指令以后,根据这些指令做对应的操作 */ //TODO 发送心跳,返回来的是NamNode给的响应指令 HeartbeatResponse resp = sendHeartBeat(); assert resp != null; dn.getMetrics().addHeartbeat(monotonicNow() - startTime); // If the state of this NN has changed (eg STANDBY->ACTIVE) // then let the BPOfferService update itself. // // Important that this happens before processCommand below, // since the first heartbeat to a new active might have commands // that we should actually process. bpos.updateActorStatesFromHeartbeat( this, resp.getNameNodeHaState()); state = resp.getNameNodeHaState().getState(); if (state == HAServiceState.ACTIVE) { handleRollingUpgradeStatus(resp); } long startProcessCommands = monotonicNow(); if (!processCommand(resp.getCommands())) continue; long endProcessCommands = monotonicNow(); if (endProcessCommands - startProcessCommands > 2000) { LOG.info("Took " + (endProcessCommands - startProcessCommands) + "ms to process " + resp.getCommands().length + " commands from NN"); } } }

4.4 sendHeartBeat()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
HeartbeatResponse sendHeartBeat() throws IOException { //TODO 每个三秒就要运行一次 StorageReport[] reports = dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; //TODO 发送心跳,首先是获取到NamNode的代理,发送心跳 return bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary); }

4.5 NameNodeRpcServer的sendHeartbeat()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary) throws IOException { checkNNStartup(); verifyRequest(nodeReg); //TODO 处理DataNode发送过来的心跳 return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary); }

4.6 FSNamesystem的handleHeartbeat()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary) throws IOException { readLock(); try { //get datanode commands final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; //TODO NameNode处理DataNode发送古来的心跳 DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( haContext.getState().getServiceState(), getFSImage().getLastAppliedOrWrittenTxId()); //TODO 给DatNode响应 return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo); } finally { readUnlock(); } }

4.7 DataNodeManager的handleHeartbeat()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
synchronized (heartbeatManager) { synchronized (datanodeMap) { DatanodeDescriptor nodeinfo = null; try { //获取注册的时候DataNode信息 //TODO 从已有的datanodeMap里面获取注册过来的DataNode信息 //如果能获取到这个datanodename回事说明以前就注册过 //如果是第一次里面是没有值的 nodeinfo = getDatanode(nodeReg); } catch(UnregisteredNodeException e) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } //TODO 更新心跳的重要信息 heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);

4.8 HeartbeatManager的updateHeartbeat()

复制代码
1
2
3
4
5
6
7
8
9
10
11
synchronized void updateHeartbeat(final DatanodeDescriptor node, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary) { stats.subtract(node); // TODO 重要代码 node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary); stats.add(node); }

4.9 DatanodeDescriptor的updateHeartbeat()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
/** * Updates stats from datanode heartbeat. */ public void updateHeartbeat(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int volFailures, VolumeFailureSummary volumeFailureSummary) { updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount, volFailures, volumeFailureSummary); heartbeatedSinceRegistration = true; }

4.10 DatanodeDescriptor的updateHeartbeatState()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//TODO 更新存储信息 setCacheCapacity(cacheCapacity); setCacheUsed(cacheUsed); setXceiverCount(xceiverCount); //TODO 修改上一次的心跳时间 /** * datanode -> namenode心跳 -> 2020/07/05 10:10:10 * datanode -> namenode心跳 -> 2020/07/05 10:10:11 * * 肯定有一个线程遍历所有节点的心跳时间,用当前时间减去上一次的心跳时间,如果超过某个值,就认为这个datanode挂了 */ setLastUpdate(Time.now()); setLastUpdateMonotonic(Time.monotonicNow());

4.11 NameNode初始化的时候启动了一些重要服务

我们为什么会来到这里呢,我们可以想一下,管理datanode是否宕机和上下的服务是在NameNode里面.在NameNode初始化的时候,我们启动的了一些重要服务,其中就包含了HeartbeatManager这个线程,见名知意,这个肯定管理了datanode的心跳,以此我们就需要看一下他的run方法,看一看到底什么情况下,会认为datanode 死掉了

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//TODO 安全模式校验 setBlockTotal(); //TODO 启动重要服务 blockManager.activate(conf); public void activate(Configuration conf) { //启动了等待复制的线程 pendingReplications.start(); //TODO 启动了管理心跳的服务 datanodeManager.activate(conf); this.replicationThread.start(); } void activate(final Configuration conf) { //启动了 管理下线datanode的服务 decomManager.activate(conf); //TODO 管理心跳 heartbeatManager.activate(conf); } void activate(Configuration conf) { heartbeatThread.start(); }

4.12 HeartbeatManager的run方法(因为他是一个线程)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/** Periodically check heartbeat and update block key */ private class Monitor implements Runnable { private long lastHeartbeatCheck; private long lastBlockKeyUpdate; @Override public void run() { // true while(namesystem.isRunning()) { try { final long now = Time.monotonicNow(); //heartbeatRecheckInterval 30s //lastHeartbeatCheck 最后一次心跳检查时间 if (lastHeartbeatCheck + heartbeatRecheckInterval < now) { //TODO 心跳检查 heartbeatCheck(); lastHeartbeatCheck = now; } if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) { synchronized(HeartbeatManager.this) { for(DatanodeDescriptor d : datanodes) { d.needKeyUpdate = true; } } lastBlockKeyUpdate = now; } } catch (Exception e) { LOG.error("Exception while checking heartbeat", e); } try { Thread.sleep(5000); // 5 seconds } catch (InterruptedException ie) { } } } }

4.13 heartbeatCheck()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
synchronized(this) { //TODO 遍历所有的datanodes /** * 注册的时候,就是把datanode注册的信息放到了这个数据结构里面 * 并且刚刚我们修改这个datanode上一次的心跳信息也是修改的这个数据结构里面的DataNode的信息 */ for (DatanodeDescriptor d : datanodes) { //判断一个datanode是不是挂了 if (dead == null && dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); dead = d; } if (d.isStale(dm.getStaleInterval())) { numOfStaleNodes++; } DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); for(DatanodeStorageInfo storageInfo : storageInfos) { if (storageInfo.areBlockContentsStale()) { numOfStaleStorages++; } if (failedStorage == null && storageInfo.areBlocksOnFailedStorage() && d != dead) { failedStorage = storageInfo; } } }

4.14 isDatanodeDead()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** Is the datanode dead? */ boolean isDatanodeDead(DatanodeDescriptor node) { //这里判断是否datanode是不是死掉了 /** * 2 * heartbeatRecheckInterval 5分钟 * + 10 * 1000 * heartbeatIntervalSecond * heartbeatRecheckInterval 5分钟 * heartbeatIntervalSecond 3 * 也就是决定一个datanode是否宕机的时间是10分30秒 */ return (node.getLastUpdateMonotonic() < (monotonicNow() - heartbeatExpireInterval)); }

4.15 心跳机制流程图

image-20200705132318802

DataNode启动流程总结

  1. DataNode首先进行初始化
    • 初始化DataStorage,这个就是负责我们DataNode存储的
    • 初始化DataXceiver,这个是我们整个DataNode的核心,以后再讲
    • 初始化DatanodeHttpServer
    • 启动Httpserver2,这个Httpserver2上绑定了一些servlet
    • 初始化rpc
    • 创建BlockPoolManager,一个联邦一个BlockPoolManager
    • 初始化BPOfferService,一个联邦一个BPOfferService
    • BPOfferService中有多个BPServiceActor,一个BPServiceActor对应一个NameNode
  2. DataNode跟NameNode进行注册
    • BPServiceActor中首先依赖代理创建了我们NameNodeRpcServer的代理
    • 调用NameNodeRpcServer中的registerDatanode
    • 然后将我们的DataNode的信息,存储在主要的两个数据结构中,一个是内存host2DatanodeMap一个是网络拓扑结构中networktopology
  3. DataNode和我们的NameNode进行心跳
    • 还是依赖的NameNodeRpcServer的代理来进行发送心跳,每三s执行一次
    • 最终会将我们汇报给NameNode的信息进行存储
    • 存储心跳时间
    • 存储DataNode的存储情况
    • 然后进行判断,我们是否有DataNode节点宕机,根据的是心跳时间是不是超过了10分30s,超过10分30s就认为宕机,为什么会这么长时间呢?因为在分布式架构中,有很多网络原因,我不能很决断的认为他死了,要给他一些时间看看还能不能救过来
    • 如果有问题,就删除掉NameNode中进行保存的该节点的DataNode信息
    • 没问题则将NameNode指令返回

最后

以上就是还单身西装最近收集整理的关于HDFS源码解析之DataNode启动流程与心跳机制(四)DataNode 启动流程的全部内容,更多相关HDFS源码解析之DataNode启动流程与心跳机制(四)DataNode内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部