概述
List of articles
- 一. Storage
- 二.DataStorage
- 三. BlockPoolSliceStorage
一. Storage
Storage是一个抽象类, 为Datanode、 Namenode提供抽象的存储服务。 Storage类管理着当前节点上( 可以是Datanode或者Namenode) 所有的存储目录, 每个存储目录都由一个StorageDirectory对象管理, Storage用一个线性表字段storageDirs存储它管理的所有StorageDirectory, 并通过Dirlterator迭代器进行遍历。
private final List<StorageDirectory> storageDirs = new CopyOnWriteArrayList<>();
二.DataStorage
DataStorage继承自Storage抽象类, 提供了管理Datanode存储空间的功能。
在HDFSFederation架构中, 一个Datanode可以保存多个命名空间的数据块, 每个命名空间在Datanode磁盘上都拥有一个独立的块池( BlockPool) , 这个块池会分布在Datanode的所有存储目录下, 它们共同保存了这个块池在当前Datanode上的所有数据块。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间 , DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池BlockPoolSliceStorage对象的引用。
// Maps block pool IDs to block pool storage
private final Map<String, BlockPoolSliceStorage> bpStorageMap = Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>());
Datanode在启动时会调用DataStorage提供的方法初始化Datanode的存储空间, 在HDFS Federation架构中, Datanode会保存多个命名空间的数据块。 对于每一个命名空间,Datanode都会构造一个BPOfferService类维护与这个命名空间Namenode的通信 。 当BPOfferService中的BPServiceActor类与该命名空间的Namenode握手成功后, 就会调用DataNode.initBlockPool()初始化该命名空间的块池。 DataNode.initBlockPool()方法最终会调用DataStorage.recoverTransitionRead()来执行块池存储的初始化操作。
DataNode#initStorage初始化只在和第一个namenode的握手完成时完成一次。
/**
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory = FsDatasetSpi.Factory.getFactory(getConf());
if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(getConf());
if (startOpt == null) {
throw new IOException("Startup option not set.");
}
//bpid: BP-451827885-192.168.8.156-1584099133244
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
synchronized (this) {
storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid={};bpid={};lv={};" + "nsInfo={};dnuuid={}",
bpStorage.getNamespaceID(), bpid, storage.getLayoutVersion(),
nsInfo, storage.getDatanodeUuid());
}
// If this is a newly formatted DataNode then assign a new DatanodeUuid.
checkDatanodeUuid();
synchronized(this) {
if (data == null) {
data = factory.newInstance(this, storage, getConf());
}
}
}
recoverTransitionRead()方法调用了同名的重载方法recoverTransitionRead()初始化Datanode存储空间, 重载的recoverTransitionRead()方法会调用addStorageLocations()方法对Datanode存储空间进行初始化, 然后将DataStorage.initialized设置为true。
/**
*
* 分析特定块池的存储目录。
* 如果需要,从以前的转换中恢复。
* 如果需要,根据命名空间信息执行fs状态转换。
* 读取存储信息。
* 此方法应该在多个DN线程之间同步。只有第一个DN线程执行DN级别的 storage dir recoverTransitionRead。
*
* Analyze storage directories for a specific block pool.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
* <br>
* This method should be synchronized between multiple DN threads. Only the
* first DN thread does DN level storage dir recoverTransitionRead.
*
* @param datanode DataNode
* @param nsInfo Namespace info of namenode corresponding to the block pool
* @param dataDirs Storage directories
* @param startOpt startup option
* @throws IOException on error
*/
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
// 调用addStorageLocations()方法对Datanode存储空间进行初始化
if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
throw new IOException("All specified directories have failed to load.");
}
}
/**
* 加载一个存储目录。如果需要,从以前的转换中恢复。
*
* Load one storage directory. Recover from previous transitions if required.
*
* @param nsInfo namespace information
* @param dataDir the root path of the storage directory
* @param startOpt startup option
* @return the StorageDirectory successfully loaded.
* @throws IOException
*/
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
StorageLocation location, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
StorageDirectory sd = new StorageDirectory(
nsInfo.getBlockPoolID(), null, true, location);
try {
// 调用analyzeStorage()方法分析当前StorageDirectory的状态
StorageState curState = sd.analyzeStorage(startOpt, this, true);
// sd is locked but not opened
// 根据curState恢复状态
switch (curState) {
//存储目录状态正常, 不用执行任何操作
case NORMAL:
break;
//对于不存在的情况, 则直接忽略
case NON_EXISTENT:
LOG.info("Block pool storage directory for location {} and block pool"
+ " id {} does not exist", location, nsInfo.getBlockPoolID());
throw new IOException("Storage directory for location " + location +
" and block pool id " + nsInfo.getBlockPoolID() +
" does not exist");
//没有格式化时, 调用format()方法格式化数据目录
case NOT_FORMATTED: // format
LOG.info("Block pool storage directory for location {} and block pool"
+ " id {} is not formatted. Formatting ...", location,
nsInfo.getBlockPoolID());
format(sd, nsInfo);
break;
default: // recovery part is common
//对于其他情况, 则调用StorageDirectory.doRecover()恢复到NORMAL状态
sd.doRecover(curState);
}
// 2. Do transitions
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
//调用doTransition()执行启动操作, 启动选项通过startOpt参数传递
if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {
// 3. Check CTime and update successfully loaded storage.
if (getCTime() != nsInfo.getCTime()) {
throw new IOException("Datanode CTime (=" + getCTime()
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
}
//3. 对于每一个成功执行的存储目录, 写入VERSION文件
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
}
return sd;
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
}
将当前存储目录恢复为正常状态之后, addStorageLocations()会调用doTransition()方法执行启动选项定义的操作。 doTransition()方法判断如果启动选项是ROLLBACK, 则调用doRollback()方法进行回滚操作。 如果存储目录记录的文件系统布局版本号(VERSION文件记录) 与内存中的版本号一致, 则Datanode正常启动; 如果存储目录记录的版本号小于内存中的版本号, 则调用doUpgrade()方法升级(注意layoutVersion为负数) 。
/**
*
* 分析是否需要转换BP状态,必要时执行。
*
* Analyze whether a transition of the BP state is required and
* perform it if necessary.
* <br>
* Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime.
* Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular
* startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
*
* @param sd storage directory <SD>/current/<bpid>
* @param nsInfo namespace info
* @param startOpt startup option
* @return true if the new properties has been written.
*/
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
// 数据来自于外部存储 什么都不做
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
return false; // regular startup for PROVIDED storage directories
}
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
// 如果启动选项是ROLLBACK, 则调用doRollback()进行回滚操作
Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
" both be present.");
doRollback(sd, nsInfo); // rollback if applicable
} else if (startOpt == StartupOption.ROLLBACK &&
!sd.getPreviousDir().exists()) {
// Restore all the files in the trash. The restored files are retained
// during rolling upgrade rollback. They are deleted during rolling
// upgrade downgrade.
// 还原垃圾箱中的所有文件。在滚动升级回滚期间将保留还原的文件。它们在滚动升级降级期间被删除。
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
LOG.info("Restored {} block files from trash.", restored);
}
//检查升级是否成功
readProperties(sd);
checkVersionUpgradable(this.layoutVersion);
assert this.layoutVersion >= HdfsServerConstants.DATANODE_LAYOUT_VERSION : "Future version is not allowed";
// 检测 NamespaceID 是否相等 , 不相等 则报错
if (getNamespaceID() != nsInfo.getNamespaceID()) {
throw new IOException("Incompatible namespaceIDs in "
+ sd.getRoot().getCanonicalPath() + ": namenode namespaceID = "
+ nsInfo.getNamespaceID() + "; datanode namespaceID = "
+ getNamespaceID());
}
// 检测 blockpoolID 是否相等 , 不相等 则报错
if (!blockpoolID.equals(nsInfo.getBlockPoolID())) {
throw new IOException("Incompatible blockpoolIDs in "
+ sd.getRoot().getCanonicalPath() + ": namenode blockpoolID = "
+ nsInfo.getBlockPoolID() + "; datanode blockpoolID = "
+ blockpoolID);
}
// 如果版本相同 并且创建时间相同 正常启动
// TODO 为什么要加一个验证创建时间 ??????
if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION && this.cTime == nsInfo.getCTime()) {
return false; // regular startup
}
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
LOG.info("Restored {} block files from trash " +
"before the layout upgrade. These blocks will be moved to " +
"the previous directory during the upgrade", restored);
}
//磁盘版本号小于代码版本号, 则调用doUpgrade()升级
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo, callables, conf); // upgrade
return true;
}
//磁盘版本号大于Datanode支持的版本号, 则抛出异常
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
// must shutdown
throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
+ " CTime = " + this.getCTime()
+ " is newer than the namespace state: LV = "
+ nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
}
doTransition()方法会分析启动选项以及存储目录中的文件系统布局版本号, 然后分别调用doRollback()、 doUpgrade()进行回滚或者升级操作, 但是doTransition()方法中并没有执行提交操作(finalize) 的入口。 这是因为提交操作是Namenode通过心跳响应携带的名字节点指令触发doFinalize()方法执行的, 并不是通过Datanode启动选项触发的.
三. BlockPoolSliceStorage
在HDFS Federation架构中, 一个Datanode可以保存多个块池的数据块, 每个块池的数据块都会分布在Datanode所有的存储目录下。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间, DataStorage类则定义了bpStorageMap字段保存Datanode
上所有块池的BlockPoolSliceStorage对象的引用。
DataStorage类管理着整个Datanode的存储, 包括Datanode定义的多个存储目录。BlockPoolSliceStorage类则管理着一个块池的存储, 包括分布在Datanode的多个存储目录下的块池目录。
所以BlockPoolSliceStorage类的功能与DataStorage类基本类似, 包括以下几点。
■ 格式化一个新的块池存储空间。
■ 恢复块池的异常状态。
■ 在升级过程中保存上一版本的快照。
■ 回滚到上一版本。
■ 提交升级, 并删除上一版本的快照。
BlockPoolSliceStorage的入口方法也是recoverTransitionRead(), DataStorage.recoverTransitionRead()方法调用的, DataStorage.recoverTransitionRead()会首先执行Datanode存储的初始化操作, 然后调用BlockPoolSliceStorage.recoverTransitionRead()执行块池存储的初始化操作。 块池存储的升级、 回滚以及提交操作是由BlockPoolSliceStorage的doUpgrade()、 doRollback()以及
doFinalize()方法执行的, 这三个方法都是在块池目录下执行对应的操作。recoverTransitionRead()、 doUpgrade()、 doRollback()以及doFinalize()方法的实现逻辑与DataStorage的同名方法基本相同.
艾玛, 这段看的乱乱的…
有空画个图梳理一下…
最后
以上就是爱撒娇诺言为你收集整理的Hadoop3.2.1 【 HDFS 】源码分析 : Datanode 存储[二]一. Storage二.DataStorage三. BlockPoolSliceStorage的全部内容,希望文章能够帮你解决Hadoop3.2.1 【 HDFS 】源码分析 : Datanode 存储[二]一. Storage二.DataStorage三. BlockPoolSliceStorage所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复