我是靠谱客的博主 爱撒娇诺言,最近开发中收集的这篇文章主要介绍Hadoop3.2.1 【 HDFS 】源码分析 : Datanode 存储[二]一. Storage二.DataStorage三. BlockPoolSliceStorage,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

List of articles

  • 一. Storage
  • 二.DataStorage
  • 三. BlockPoolSliceStorage

一. Storage

Storage是一个抽象类, 为Datanode、 Namenode提供抽象的存储服务。 Storage类管理着当前节点上( 可以是Datanode或者Namenode) 所有的存储目录, 每个存储目录都由一个StorageDirectory对象管理, Storage用一个线性表字段storageDirs存储它管理的所有StorageDirectory, 并通过Dirlterator迭代器进行遍历。

  private final List<StorageDirectory> storageDirs =    new CopyOnWriteArrayList<>();

二.DataStorage

DataStorage继承自Storage抽象类, 提供了管理Datanode存储空间的功能。
在HDFSFederation架构中, 一个Datanode可以保存多个命名空间的数据块, 每个命名空间在Datanode磁盘上都拥有一个独立的块池( BlockPool) , 这个块池会分布在Datanode的所有存储目录下, 它们共同保存了这个块池在当前Datanode上的所有数据块。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间 , DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池BlockPoolSliceStorage对象的引用。

  // Maps block pool IDs to block pool storage
  private final Map<String, BlockPoolSliceStorage> bpStorageMap = Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>());

Datanode在启动时会调用DataStorage提供的方法初始化Datanode的存储空间, 在HDFS Federation架构中, Datanode会保存多个命名空间的数据块。 对于每一个命名空间,Datanode都会构造一个BPOfferService类维护与这个命名空间Namenode的通信 。 当BPOfferService中的BPServiceActor类与该命名空间的Namenode握手成功后, 就会调用DataNode.initBlockPool()初始化该命名空间的块池。 DataNode.initBlockPool()方法最终会调用DataStorage.recoverTransitionRead()来执行块池存储的初始化操作。

DataNode#initStorage初始化只在和第一个namenode的握手完成时完成一次。

/**
   * Initializes the {@link #data}. The initialization is done only once, when
   * handshake with the the first namenode is completed.
   */
  private void initStorage(final NamespaceInfo nsInfo) throws IOException {


    final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory  = FsDatasetSpi.Factory.getFactory(getConf());
    
    if (!factory.isSimulated()) {

      final StartupOption startOpt = getStartupOption(getConf());

      if (startOpt == null) {
        throw new IOException("Startup option not set.");
      }

      //bpid:  BP-451827885-192.168.8.156-1584099133244
      final String bpid = nsInfo.getBlockPoolID();

      //read storage info, lock data dirs and transition fs state if necessary
      synchronized (this) {


        storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
      }

      final StorageInfo bpStorage = storage.getBPStorage(bpid);



      LOG.info("Setting up storage: nsid={};bpid={};lv={};" +   "nsInfo={};dnuuid={}",
          bpStorage.getNamespaceID(), bpid, storage.getLayoutVersion(),
          nsInfo, storage.getDatanodeUuid());


    }

    // If this is a newly formatted DataNode then assign a new DatanodeUuid.
    checkDatanodeUuid();

    synchronized(this)  {
      if (data == null) {
        data = factory.newInstance(this, storage, getConf());
      }
    }
  }

在这里插入图片描述

recoverTransitionRead()方法调用了同名的重载方法recoverTransitionRead()初始化Datanode存储空间, 重载的recoverTransitionRead()方法会调用addStorageLocations()方法对Datanode存储空间进行初始化, 然后将DataStorage.initialized设置为true。


  /**
   *
   * 分析特定块池的存储目录。
   * 如果需要,从以前的转换中恢复。
   * 如果需要,根据命名空间信息执行fs状态转换。
   * 读取存储信息。
   * 此方法应该在多个DN线程之间同步。只有第一个DN线程执行DN级别的 storage dir recoverTransitionRead。
   *
   * Analyze storage directories for a specific block pool.
   * Recover from previous transitions if required.
   * Perform fs state transition if necessary depending on the namespace info.
   * Read storage info.
   * <br>
   * This method should be synchronized between multiple DN threads.  Only the
   * first DN thread does DN level storage dir recoverTransitionRead.
   *
   * @param datanode DataNode
   * @param nsInfo Namespace info of namenode corresponding to the block pool
   * @param dataDirs Storage directories
   * @param startOpt startup option
   * @throws IOException on error
   */
  void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
      Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {

    // 调用addStorageLocations()方法对Datanode存储空间进行初始化
    if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
      throw new IOException("All specified directories have failed to load.");
    }
  }

在这里插入图片描述


  /**
   * 加载一个存储目录。如果需要,从以前的转换中恢复。
   *
   * Load one storage directory. Recover from previous transitions if required.
   *
   * @param nsInfo namespace information
   * @param dataDir the root path of the storage directory
   * @param startOpt startup option
   * @return the StorageDirectory successfully loaded.
   * @throws IOException
   */
  private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
      StorageLocation location, StartupOption startOpt,
      List<Callable<StorageDirectory>> callables, Configuration conf)
          throws IOException {
    StorageDirectory sd = new StorageDirectory(
        nsInfo.getBlockPoolID(), null, true, location);
    try {

      // 调用analyzeStorage()方法分析当前StorageDirectory的状态
      StorageState curState = sd.analyzeStorage(startOpt, this, true);
      // sd is locked but not opened
      // 根据curState恢复状态
      switch (curState) {
      //存储目录状态正常, 不用执行任何操作
      case NORMAL:
        break;

      //对于不存在的情况, 则直接忽略
      case NON_EXISTENT:
        LOG.info("Block pool storage directory for location {} and block pool"
            + " id {} does not exist", location, nsInfo.getBlockPoolID());
        throw new IOException("Storage directory for location " + location +
            " and block pool id " + nsInfo.getBlockPoolID() +
            " does not exist");
      //没有格式化时, 调用format()方法格式化数据目录
      case NOT_FORMATTED: // format
        LOG.info("Block pool storage directory for location {} and block pool"
                + " id {} is not formatted. Formatting ...", location,
            nsInfo.getBlockPoolID());
        format(sd, nsInfo);
        break;
      default:  // recovery part is common
        //对于其他情况, 则调用StorageDirectory.doRecover()恢复到NORMAL状态
        sd.doRecover(curState);
      }

      // 2. Do transitions
      // Each storage directory is treated individually.
      // During startup some of them can upgrade or roll back
      // while others could be up-to-date for the regular startup.

      //调用doTransition()执行启动操作, 启动选项通过startOpt参数传递
      if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {

        // 3. Check CTime and update successfully loaded storage.
        if (getCTime() != nsInfo.getCTime()) {
          throw new IOException("Datanode CTime (=" + getCTime()
              + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
        }
        //3. 对于每一个成功执行的存储目录, 写入VERSION文件
        setServiceLayoutVersion(getServiceLayoutVersion());
        writeProperties(sd);
      }

      return sd;
    } catch (IOException ioe) {
      sd.unlock();
      throw ioe;
    }
  }

将当前存储目录恢复为正常状态之后, addStorageLocations()会调用doTransition()方法执行启动选项定义的操作。 doTransition()方法判断如果启动选项是ROLLBACK, 则调用doRollback()方法进行回滚操作。 如果存储目录记录的文件系统布局版本号(VERSION文件记录) 与内存中的版本号一致, 则Datanode正常启动; 如果存储目录记录的版本号小于内存中的版本号, 则调用doUpgrade()方法升级(注意layoutVersion为负数) 。


  /**
   *
   * 分析是否需要转换BP状态,必要时执行。
   *
   * Analyze whether a transition of the BP state is required and
   * perform it if necessary.
   * <br>
   * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime.
   * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular
   * startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
   * 
   * @param sd storage directory <SD>/current/<bpid>
   * @param nsInfo namespace info
   * @param startOpt startup option
   * @return true if the new properties has been written.
   */
  private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
      Configuration conf) throws IOException {

    // 数据来自于外部存储 什么都不做
    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
      return false; // regular startup for PROVIDED storage directories
    }

    if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
      // 如果启动选项是ROLLBACK, 则调用doRollback()进行回滚操作
      Preconditions.checkState(!getTrashRootDir(sd).exists(),
          sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
          " both be present.");
      doRollback(sd, nsInfo); // rollback if applicable

    } else if (startOpt == StartupOption.ROLLBACK &&
        !sd.getPreviousDir().exists()) {
      // Restore all the files in the trash. The restored files are retained
      // during rolling upgrade rollback. They are deleted during rolling
      // upgrade downgrade.
      // 还原垃圾箱中的所有文件。在滚动升级回滚期间将保留还原的文件。它们在滚动升级降级期间被删除。
      int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
      LOG.info("Restored {} block files from trash.", restored);
    }
    //检查升级是否成功
    readProperties(sd);
    checkVersionUpgradable(this.layoutVersion);


    assert this.layoutVersion >= HdfsServerConstants.DATANODE_LAYOUT_VERSION  : "Future version is not allowed";

    // 检测 NamespaceID 是否相等 , 不相等 则报错
    if (getNamespaceID() != nsInfo.getNamespaceID()) {
      throw new IOException("Incompatible namespaceIDs in "
          + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = "
          + nsInfo.getNamespaceID() + "; datanode namespaceID = "
          + getNamespaceID());
    }

    // 检测 blockpoolID 是否相等 , 不相等 则报错
    if (!blockpoolID.equals(nsInfo.getBlockPoolID())) {
      throw new IOException("Incompatible blockpoolIDs in "
          + sd.getRoot().getCanonicalPath() + ": namenode blockpoolID = "
          + nsInfo.getBlockPoolID() + "; datanode blockpoolID = "
          + blockpoolID);
    }

    // 如果版本相同 并且创建时间相同 正常启动
    // TODO 为什么要加一个验证创建时间 ??????
    if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION && this.cTime == nsInfo.getCTime()) {
      return false; // regular startup
    }


    if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
      int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
      LOG.info("Restored {} block files from trash " +
          "before the layout upgrade. These blocks will be moved to " +
          "the previous directory during the upgrade", restored);
    }

    //磁盘版本号小于代码版本号, 则调用doUpgrade()升级
    if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
        || this.cTime < nsInfo.getCTime()) {
      doUpgrade(sd, nsInfo, callables, conf); // upgrade
      return true;
    }

    //磁盘版本号大于Datanode支持的版本号, 则抛出异常
    // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
    // must shutdown
    throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
        + " CTime = " + this.getCTime()
        + " is newer than the namespace state: LV = "
        + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
  }

doTransition()方法会分析启动选项以及存储目录中的文件系统布局版本号, 然后分别调用doRollback()、 doUpgrade()进行回滚或者升级操作, 但是doTransition()方法中并没有执行提交操作(finalize) 的入口。 这是因为提交操作是Namenode通过心跳响应携带的名字节点指令触发doFinalize()方法执行的, 并不是通过Datanode启动选项触发的.

三. BlockPoolSliceStorage

在HDFS Federation架构中, 一个Datanode可以保存多个块池的数据块, 每个块池的数据块都会分布在Datanode所有的存储目录下。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间, DataStorage类则定义了bpStorageMap字段保存Datanode
上所有块池的BlockPoolSliceStorage对象的引用。

DataStorage类管理着整个Datanode的存储, 包括Datanode定义的多个存储目录。BlockPoolSliceStorage类则管理着一个块池的存储, 包括分布在Datanode的多个存储目录下的块池目录。

所以BlockPoolSliceStorage类的功能与DataStorage类基本类似, 包括以下几点。
■ 格式化一个新的块池存储空间。
■ 恢复块池的异常状态。
■ 在升级过程中保存上一版本的快照。
■ 回滚到上一版本。
■ 提交升级, 并删除上一版本的快照。

BlockPoolSliceStorage的入口方法也是recoverTransitionRead(), DataStorage.recoverTransitionRead()方法调用的, DataStorage.recoverTransitionRead()会首先执行Datanode存储的初始化操作, 然后调用BlockPoolSliceStorage.recoverTransitionRead()执行块池存储的初始化操作。 块池存储的升级、 回滚以及提交操作是由BlockPoolSliceStorage的doUpgrade()、 doRollback()以及
doFinalize()方法执行的, 这三个方法都是在块池目录下执行对应的操作。recoverTransitionRead()、 doUpgrade()、 doRollback()以及doFinalize()方法的实现逻辑与DataStorage的同名方法基本相同.

艾玛, 这段看的乱乱的…
有空画个图梳理一下…

最后

以上就是爱撒娇诺言为你收集整理的Hadoop3.2.1 【 HDFS 】源码分析 : Datanode 存储[二]一. Storage二.DataStorage三. BlockPoolSliceStorage的全部内容,希望文章能够帮你解决Hadoop3.2.1 【 HDFS 】源码分析 : Datanode 存储[二]一. Storage二.DataStorage三. BlockPoolSliceStorage所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部