概述
DataNode 启动流程
1. 查看DataNode类的注释
1.1 DataNode类注释
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
* have one or many DataNodes. Each DataNode communicates
* regularly with a single NameNode. It also communicates
* with client code and other DataNodes from time to time.
*
* TODO (1)
* DataNode存储hdfs上的block文件块,在一个文件系统里面可以有有多个datanode
* 每个datanode周期性的跟namenode进行通信,客户端也可以跟datanode进行交互
* 同时datanode之间也可以进行相互通信
*
* DataNodes store a series of named blocks. The DataNode
* allows client code to read these blocks, or to write new
* block data. The DataNode may also, in response to instructions
* from its NameNode, delete blocks or copy blocks to/from other
* DataNodes.
*
* TODO (2)
* DataNode 存储一些列block datanode允许客户端去读写block
* DataNode也会去 响应 NameNode发送过来的一些指令
* 比如:删除block,复制block等操作
*
* namenode
* datanode->namenode
* 在hdfs里面namenode是不会直接去操作datanode,但是一些指令会携带在心跳中来进行控制datanode
* eg:
* 我们有一个block01 这个块有三个副本,假如hadoop03节点宕机了,datanode是不能感知的,但是namenode可以根据心跳来感知
* 那么这个时候我们需要保持三个副本,那么这个时候namenode就会把同步副本的指令携带在心跳当中来进行发起
*
* The DataNode maintains just one critical table:
* block-> stream of bytes (of BLOCK_SIZE or less)
* TODO (3)
* datanode管理了一个重要的表:
* block-> stream of bytes 看起来像是一些元数据的信息
*
*
* This info is stored on a local disk. The DataNode
* reports the table's contents to the NameNode upon startup
* and every so often afterwards.
* TODO (4)
* 这个信息是存储在本地磁盘,DataNode启动的时候会把这些信息
* 汇报给NameNode,启动了之后也会再去不断的汇报.
*
*
* DataNodes spend their lives in an endless loop of asking
* the NameNode for something to do. A NameNode cannot connect
* to a DataNode directly; a NameNode simply returns values from
* functions invoked by a DataNode.
*
* TODO (5)
* DataNode启动了以后会一直去问NameNode自己需要干些什么?心跳
* NameNode是不能直接去操作DataNode的,DataNode启动了以后,会跟NameNode进行心跳,
* NameNode接受到了心跳以后,如果需要这个DataNode做什么事情,
* 就会给DataNode一个返回值(指令),DataNode接受到这些指令以后就知道NamNode想让他做什么事了
*
*
* DataNodes maintain an open server socket so that client code
* or other DataNodes can read/write data. The host/port for
* this server is reported to the NameNode, which then sends that
* information to clients or other DataNodes that might be interested.
* TODO (6)
* DataNode开放了Socket服务,让客户端或者别的DataNode来进行读写数据
* DataNode启动的时候就会把自己的主机名和端口号汇报给NamNode
* 也就是说如果Client和DataNode想要去访问某个DataNode,首先要跟NameNode进行通信
* 从NameNode那获取到目标DataNode的主机名和端口号.
* 这样才可以访问到对应的DataNode了
*
* TODO 总结:
* 1. 一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据的.
* 2. DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报)
* 3. NameNode不能直接操作DataNode,而是通信心跳返回值指令的方式去操作DataNode
* 4. DataNode启动以后开放一个Socket的服务(RPC),等待被人去调用它.
*
**********************************************************/
2. DataNode初始化
2.1 main
public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0);
}
secureMain(args, null);
}
2.2 secureMain
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
try {
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
//TODO 初始化DataNode
DataNode datanode = createDataNode(args, null, resources);
if (datanode != null) {
//TODO 阻塞起来,也就是为什么JPS可以看到DataNode
datanode.join();
} else {
errorCode = 1;
}
} catch (Throwable e) {
LOG.fatal("Exception in secureMain", e);
terminate(1, e);
} finally {
// We need to terminate the process here because either shutdown was called
// or some disk related conditions like volumes tolerated or volumes required
// condition was not met. Also, In secure mode, control will go to Jsvc
// and Datanode process hangs if it does not exit.
LOG.warn("Exiting Datanode");
terminate(errorCode);
}
}
2.3 createDataNode
/** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
@VisibleForTesting
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
//TODO 实例化DataNode
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
//启动后台线程
dn.runDatanodeDaemon();
}
return dn;
}
2.4 instantiateDataNode
/** Instantiate a single datanode object, along with its secure resources.
* This must be run by invoking{@link DataNode#runDatanodeDaemon()}
* subsequently.
*/
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
if (conf == null)
conf = new HdfsConfiguration();
if (args != null) {
// parse generic hadoop options
GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
args = hParser.getRemainingArgs();
}
if (!parseArguments(args, conf)) {
printUsage(System.err);
return null;
}
Collection<StorageLocation> dataLocations = getStorageLocations(conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
//TODO 重要的代码
return makeInstance(dataLocations, conf, resources);
}
2.5 makeInstance
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @param resources Secure resources needed to run under Kerberos
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
// 磁盘检查
DataNodeDiskChecker dataNodeDiskChecker =
new DataNodeDiskChecker(permission);
List<StorageLocation> locations =
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
DefaultMetricsSystem.initialize("DataNode");
assert locations.size() > 0 : "number of data directories should be > 0";
//TODO 创建我们的DataNode
return new DataNode(conf, locations, resources);
}
2.6 new DataNode
/**
* Create the DataNode given a configuration, an array of dataDirs,
* and a namenode proxy
*/
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
super(conf);
...
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
//TODO 启动DataNode
startDataNode(conf, dataDirs, resources);
} catch (IOException ie) {
shutdown();
throw ie;
}
final int dncCacheMaxSize =
conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
datanodeNetworkCounts =
CacheBuilder.newBuilder()
.maximumSize(dncCacheMaxSize)
.build(new CacheLoader<String, Map<String, Long>>() {
@Override
public Map<String, Long> load(String key) throws Exception {
final Map<String, Long> ret = new HashMap<String, Long>();
ret.put("networkErrors", 0L);
return ret;
}
});
}
2.7 startDataNode
/**
* This method starts the data node with the specified conf.
*
* @param conf - the configuration
* if conf's CONFIG_PROPERTY_SIMULATED property is set
* then a simulated storage based data node is created.
*
* @param dataDirs - only for a non-simulated storage data node
* @throws IOException
*/
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
SecureResources resources
) throws IOException {
// settings global for all BPs in the Data Node
this.secureResources = resources;
synchronized (this) {
this.dataDirs = dataDirs;
}
this.conf = conf;
this.dnConf = new DNConf(conf);
checkSecureConfig(dnConf, conf, resources);
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) is greater than zero and native code is not available.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
}
if (Path.WINDOWS) {
NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
} else {
long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
if (dnConf.maxLockedMemory > ulimit) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) of %d bytes is more than the datanode's available" +
" RLIMIT_MEMLOCK ulimit of %d bytes.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
dnConf.maxLockedMemory,
ulimit));
}
}
}
LOG.info("Starting DataNode with maxLockedMemory = " +
dnConf.maxLockedMemory);
// TODO 存储对象
storage = new DataStorage();
// global DN settings
registerMXBean();
//TODO 初始化DataXceiver
initDataXceiver(conf);
//TODO 初始化HttpServer服务
startInfoServer(conf);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
// Login is done by now. Set the DN user name.
dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.info("dnUserName = " + dnUserName);
LOG.info("supergroup = " + supergroup);
initIpcServer(conf);
metrics = DataNodeMetrics.create(conf, getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(conf);
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}
2.8 initDataXceiver
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf));
}
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
//TODO 实例化了一个DataXceiverServer 这个东西就是DataNode用来接收客户端和其他DataNode传过来的数据服务
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
//设置为后台线程
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listening on UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
2.9 startInfoServer
/**
* @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
* for information related to the different configuration options and
* Http Policy is decided.
*/
private void startInfoServer(Configuration conf)
throws IOException {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
//NameNode启动的时候也启动了一个httpserver2
//TODO 用来接收http的请求
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("datanode")
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.addEndpoint(URI.create("http://localhost:0"))
.setFindPort(true);
this.infoServer = builder.build();
//TODO 网这个httpserver上面绑定了多个servlet
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
FileChecksumServlets.GetServlet.class);
this.infoServer.setAttribute("datanode", this);
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport",
BlockScanner.Servlet.class);
//启动了httpserver
this.infoServer.start();
InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
// SecureDataNodeStarter will bind the privileged port to the channel if
// the DN is started by JSVC, pass it along.
ServerSocketChannel httpServerChannel = secureResources != null ?
secureResources.getHttpServerChannel() : null;
this.httpServer = new DatanodeHttpServer(conf, jettyAddr, httpServerChannel);
httpServer.start();
if (httpServer.getHttpAddress() != null) {
infoPort = httpServer.getHttpAddress().getPort();
}
if (httpServer.getHttpsAddress() != null) {
infoSecurePort = httpServer.getHttpsAddress().getPort();
}
}
2.10 initIpcServer
private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
// Add all the RPC protocols that the Datanode implements
RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
new ClientDatanodeProtocolServerSideTranslatorPB(this);
BlockingService service = ClientDatanodeProtocolService
.newReflectiveBlockingService(clientDatanodeProtocolXlator);
ipcServer = new RPC.Builder(conf)
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
.setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build();
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService
.newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
ipcServer);
TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
ipcServer);
LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
// set service-level authorization security policy
if (conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
2.11 new BlockPoolManager
//TODO 创建了BlockPoolMangaer
/**
* BlockPool,一个集群就有一个BlockPool
* 如果我们是联邦机制,就会有多个namenode,也就是会有多个联邦,一个联邦就是一个blockpool
* 假设一个集群里面:4个namenode 两个联邦
* 联邦一:hadoop1(Active) Hadoop2(StandBy) (blockPool是同一个)
* 联邦二:hadoop2(Active) Hadoop3(StandBy) (blockPool是同一个)
*/
blockPoolManager = new BlockPoolManager(this);
2.12 refreshNamenodes
void refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: " + conf.get
(DFSConfigKeys.DFS_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
synchronized (refreshNamenodesLock) {
//TODO 重要代码
doRefreshNamenodes(newAddressMap);
}
}
2.13 refreshNamenodes
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
//联邦id
Set<String> toAdd = Sets.newLinkedHashSet();
Set<String> toRemove;
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
//TODO 通常情况下: HDFS集群的架构是HA架构
//nameservice(hadoop1 hadoop2)
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
//TODO toAdd里面有多少的联邦,一个联邦就是一个NameService
toAdd.add(nameserviceId);
}
}
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
assert toRefresh.size() + toAdd.size() ==
addrMap.size() :
"toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
" toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
" toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toAdd));
//TODO 遍历所有的联邦,一个联邦里面会有两个Namnode(HA)
//toAdd正常的集群,里面其实只有一个值
for (String nsToAdd : toAdd) {
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToAdd).values());
//TODO 重要的关系
// 一个联邦对应一个BPOfferService
// 一个联邦里面的一个NameNode就是一个BPServiceActor
//也就是正常来说一个BPOfferService有两个BPServiceActor
BPOfferService bpos = createBPOS(addrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
//TODO 心跳
startAll();
}
// Step 4. Shut down old nameservices. This happens outside
// of the synchronized(this) lock since they need to call
// back to .remove() from another thread
if (!toRemove.isEmpty()) {
LOG.info("Stopping BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toRemove));
for (String nsToRemove : toRemove) {
BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
bpos.stop();
bpos.join();
// they will call remove on their own
}
}
// Step 5. Update nameservices whose NN list has changed
if (!toRefresh.isEmpty()) {
LOG.info("Refreshing list of NNs for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toRefresh));
//遍历所有的
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToRefresh).values());
bpos.refreshNNList(addrs);
}
}
}
2.14 createBPOS
//TODO 重要的关系
// 一个联邦对应一个BPOfferService
// 一个联邦里面的一个NameNode就是一个BPServiceActor
//也就是正常来说一个BPOfferService有两个BPServiceActor
BPOfferService bpos = createBPOS(addrs);
/**
* Extracted out for test purposes.
*/
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
return new BPOfferService(nnAddrs, dn);
}
2.15 new BPOfferService
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
this.dn = dn;
//遍历所有的NamoNode绑定到一个BPServiceActor上
for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
}
}
2.16 此时的DataNode初始化流程图
3. DataNode注册机制
3.1 refreshNamenodes的startAll
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
//TODO 遍历所有的BPOfferService 也就是说遍历所有的联邦
for (BPOfferService bpos : offerServices) {
// TODO 重要
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
3.2 bpos.start()
//This must be called only by blockPoolManager
void start() {
//TODO 一个bpServices 里面就会有多个Actor
for (BPServiceActor actor : bpServices) {
//TODO DataNode进行注册和心跳
/**
* 第一个联邦 hadoop1,hadoop2
* actor1 -> hadoop1
* actor2 -> hadoop2
*/
actor.start();
}
}
3.3 actor.start();
//This must be called only by BPOfferService
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this, formatThreadName());
//设置为后台启动
bpThread.setDaemon(true); // needed for JUnit testing
//TODO 启动线程,所以我们接下来观察他的run方法
bpThread.start();//run
}
3.4 BPServiceActror的run()
try {
while (true) {
// init stuff
try {
// setup storage
//TODO 注册核心代码
//为什么是while true,其实就是我们尽量要保证完成
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
//TODO 如果有问题sleep 5秒
//TODO 再继续尝试
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
//注册结束了
3.5 connectToNNAndHandshake()
private void connectToNNAndHandshake() throws IOException {
//TODO 获取到namenode的代理
//RPC的客户端
// get NN proxy
bpNamenode = dn.connectToNN(nnAddr);
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
//TODO 校验NamespaceInfo的信息
//TODO datanode -> HA()
bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
//TODO DataNode注册
register(nsInfo);
}
3.6 register(nsInfo);
/**
* Register one bp with the corresponding NameNode
* <p>
* The bpDatanode needs to register with the namenode on startup in order
* 1) to report which storage it is serving now and
* 2) to receive a registrationID
*
* issued by the namenode to recognize registered datanodes.
*
* @param nsInfo current NamespaceInfo
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
//TODO 创建注册信息,也就是自己的信息,例如磁盘什么的块啊
bpRegistration = bpos.createRegistration();
LOG.info(this + " beginning handshake with NN");
while (shouldRun()) {
try {
// Use returned registration from namenode with updated fields
//TODO 调用服务端的registerDatanode方法
//bpNamenode 其实应该就是代表了NameNode,其实就是服务端的代理
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
//TODO 如果执行到着了,说明注册过程已经完成了
bpRegistration.setNamespaceInfo(nsInfo);
break;
} catch(EOFException e) { // namenode might have just restarted
LOG.info("Problem connecting to server: " + nnAddr + " :"
+ e.getLocalizedMessage());
sleepAndLogInterrupts(1000, "connecting to server");
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
sleepAndLogInterrupts(1000, "connecting to server");
}
}
LOG.info("Block pool " + this + " successfully registered with NN");
bpos.registrationSucceeded(this, bpRegistration);
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(dnConf.initialBlockReportDelay);
}
3.7 NameNodeRpcServer的registerDatanode
@Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
//是否启动起来
checkNNStartup();
verifySoftwareVersion(nodeReg);
//TODO 注册DataNode
namesystem.registerDatanode(nodeReg);
return nodeReg;
}
3.8 namesystem.registerDatanode(nodeReg);
/**
* Register Datanode.
* <p>
* The purpose of registration is to identify whether the new datanode
* serves a new data storage, and will report new data block copies,
* which the namenode was not aware of; or the datanode is a replacement
* node for the data storage that was previously served by a different
* or the same (in terms of host:port) datanode.
* The data storages are distinguished by their storageIDs. When a new
* data storage is reported the namenode issues a new unique storageID.
* <p>
* Finally, the namenode returns its namespaceID as the registrationID
* for the datanodes.
* namespaceID is a persistent attribute of the name space.
* The registrationID is checked every time the datanode is communicating
* with the namenode.
* Datanodes with inappropriate registrationID are rejected.
* If the namenode stops, and then restarts it can restore its
* namespaceID and will continue serving the datanodes that has previously
* registered with the namenode without restarting the whole cluster.
*
* @see org.apache.hadoop.hdfs.server.datanode.DataNode
*/
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
writeLock();
try {
//TODO DataNodeManager 处理关于DataNode的事情
getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
checkSafeMode();
} finally {
writeUnlock();
}
}
3.9 DatanodeManager的registerDatanode()
// register new datanode
//TODO 注册DataNode
addDatanode(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
//TODO 把注册上来的DataNode加入到HeartbeatManager里面
//后面进行心跳机制
heartbeatManager.addDatanode(nodeDescr);
3.10 addDatanode(nodeDescr);
/** Add a datanode. */
//TODO 注册DataNode说白了就是往一堆数据结构立面添加
void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
synchronized(datanodeMap) {
//TODO datanode 是否已经拥有过
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
//TODO 往拓扑数据结构立面假如一条数据
networktopology.add(node); // may throw InvalidTopologyException
//TODO 往内存里面加入一条数据
host2DatanodeMap.add(node);
//如果以上内存数据结构立面的数据
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
3.11 NameNode注册流程图
4. DataNode的心跳机制
4.1 heartbeatManager.addDatanode(nodeDescr);
synchronized void addDatanode(final DatanodeDescriptor d) {
// update in-service node count
//往各种数据结构立面存东西
stats.add(d);
//往datanodes list结构立面存进去了datanode的信息
datanodes.add(d);
d.isAlive = true;
}
4.2 BPServiceActor的run()
while (shouldRun()) {
try {
//TODO 发送心跳
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
4.3 offerService()
while (shouldRun()) {
try {
final long startTime = monotonicNow();
//
// Every so often, send heartbeat or block-report
//
//TODO 心跳是每三秒进行一次
//startTime = 当前时间
//lastHeartbeat 最后一次心跳时间
// dnConf.heartBeatInterval = 3*1000L 也就是三秒
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
//TODO 重新更新最后一次心跳时间
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
/**
* NameNode是不直接跟DataNode进行连接的
* DataNode发送心跳给NameNode
* NameNode接受到心跳以后,会返回来一些指令
* Datanode接收到这些指令以后,根据这些指令做对应的操作
*/
//TODO 发送心跳,返回来的是NamNode给的响应指令
HeartbeatResponse resp = sendHeartBeat();
assert resp != null;
dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
//
// Important that this happens before processCommand below,
// since the first heartbeat to a new active might have commands
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = monotonicNow();
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands = monotonicNow();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
+ " commands from NN");
}
}
}
4.4 sendHeartBeat()
HeartbeatResponse sendHeartBeat() throws IOException {
//TODO 每个三秒就要运行一次
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
//TODO 发送心跳,首先是获取到NamNode的代理,发送心跳
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
}
4.5 NameNodeRpcServer的sendHeartbeat()
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
//TODO 处理DataNode发送过来的心跳
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary);
}
4.6 FSNamesystem的handleHeartbeat()
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
readLock();
try {
//get datanode commands
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
//TODO NameNode处理DataNode发送古来的心跳
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getLastAppliedOrWrittenTxId());
//TODO 给DatNode响应
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
} finally {
readUnlock();
}
}
4.7 DataNodeManager的handleHeartbeat()
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
//获取注册的时候DataNode信息
//TODO 从已有的datanodeMap里面获取注册过来的DataNode信息
//如果能获取到这个datanodename回事说明以前就注册过
//如果是第一次里面是没有值的
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
//TODO 更新心跳的重要信息
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
4.8 HeartbeatManager的updateHeartbeat()
synchronized void updateHeartbeat(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
// TODO 重要代码
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
}
4.9 DatanodeDescriptor的updateHeartbeat()
/**
* Updates stats from datanode heartbeat.
*/
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
heartbeatedSinceRegistration = true;
}
4.10 DatanodeDescriptor的updateHeartbeatState()
//TODO 更新存储信息
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
//TODO 修改上一次的心跳时间
/**
* datanode -> namenode心跳 -> 2020/07/05 10:10:10
* datanode -> namenode心跳 -> 2020/07/05 10:10:11
*
* 肯定有一个线程遍历所有节点的心跳时间,用当前时间减去上一次的心跳时间,如果超过某个值,就认为这个datanode挂了
*/
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());
4.11 NameNode初始化的时候启动了一些重要服务
我们为什么会来到这里呢,我们可以想一下,管理datanode是否宕机和上下的服务是在NameNode里面.在NameNode初始化的时候,我们启动的了一些重要服务,其中就包含了HeartbeatManager这个线程,见名知意,这个肯定管理了datanode的心跳,以此我们就需要看一下他的run方法,看一看到底什么情况下,会认为datanode 死掉了
//TODO 安全模式校验
setBlockTotal();
//TODO 启动重要服务
blockManager.activate(conf);
public void activate(Configuration conf) {
//启动了等待复制的线程
pendingReplications.start();
//TODO 启动了管理心跳的服务
datanodeManager.activate(conf);
this.replicationThread.start();
}
void activate(final Configuration conf) {
//启动了 管理下线datanode的服务
decomManager.activate(conf);
//TODO 管理心跳
heartbeatManager.activate(conf);
}
void activate(Configuration conf) {
heartbeatThread.start();
}
4.12 HeartbeatManager的run方法(因为他是一个线程)
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
@Override
public void run() {
// true
while(namesystem.isRunning()) {
try {
final long now = Time.monotonicNow();
//heartbeatRecheckInterval 30s
//lastHeartbeatCheck 最后一次心跳检查时间
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
//TODO 心跳检查
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
}
4.13 heartbeatCheck()
synchronized(this) {
//TODO 遍历所有的datanodes
/**
* 注册的时候,就是把datanode注册的信息放到了这个数据结构里面
* 并且刚刚我们修改这个datanode上一次的心跳信息也是修改的这个数据结构里面的DataNode的信息
*/
for (DatanodeDescriptor d : datanodes) {
//判断一个datanode是不是挂了
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
}
if (d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
for(DatanodeStorageInfo storageInfo : storageInfos) {
if (storageInfo.areBlockContentsStale()) {
numOfStaleStorages++;
}
if (failedStorage == null &&
storageInfo.areBlocksOnFailedStorage() &&
d != dead) {
failedStorage = storageInfo;
}
}
}
4.14 isDatanodeDead()
/** Is the datanode dead? */
boolean isDatanodeDead(DatanodeDescriptor node) {
//这里判断是否datanode是不是死掉了
/**
* 2 * heartbeatRecheckInterval 5分钟
* + 10 * 1000 * heartbeatIntervalSecond
* heartbeatRecheckInterval 5分钟
* heartbeatIntervalSecond 3
* 也就是决定一个datanode是否宕机的时间是10分30秒
*/
return (node.getLastUpdateMonotonic() <
(monotonicNow() - heartbeatExpireInterval));
}
4.15 心跳机制流程图
DataNode启动流程总结
- DataNode首先进行初始化
- 初始化DataStorage,这个就是负责我们DataNode存储的
- 初始化DataXceiver,这个是我们整个DataNode的核心,以后再讲
- 初始化DatanodeHttpServer
- 启动Httpserver2,这个Httpserver2上绑定了一些servlet
- 初始化rpc
- 创建BlockPoolManager,一个联邦一个BlockPoolManager
- 初始化BPOfferService,一个联邦一个BPOfferService
- BPOfferService中有多个BPServiceActor,一个BPServiceActor对应一个NameNode
- DataNode跟NameNode进行注册
- BPServiceActor中首先依赖代理创建了我们NameNodeRpcServer的代理
- 调用NameNodeRpcServer中的registerDatanode
- 然后将我们的DataNode的信息,存储在主要的两个数据结构中,一个是内存
host2DatanodeMap
一个是网络拓扑结构中networktopology
- DataNode和我们的NameNode进行心跳
- 还是依赖的NameNodeRpcServer的代理来进行发送心跳,每三s执行一次
- 最终会将我们汇报给NameNode的信息进行存储
- 存储心跳时间
- 存储DataNode的存储情况
- 然后进行判断,我们是否有DataNode节点宕机,根据的是心跳时间是不是超过了10分30s,超过10分30s就认为宕机,为什么会这么长时间呢?因为在分布式架构中,有很多网络原因,我不能很决断的认为他死了,要给他一些时间看看还能不能救过来
- 如果有问题,就删除掉NameNode中进行保存的该节点的DataNode信息
- 没问题则将NameNode指令返回
最后
以上就是还单身西装为你收集整理的HDFS源码解析之DataNode启动流程与心跳机制(四)DataNode 启动流程的全部内容,希望文章能够帮你解决HDFS源码解析之DataNode启动流程与心跳机制(四)DataNode 启动流程所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复