我是靠谱客的博主 靓丽灯泡,最近开发中收集的这篇文章主要介绍Filesystem closed报错问题处理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

使用HDFS的时候

final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf);
final Path path = new Path(hdfsFile);
if (fs.exists(path)) {
	final FSDataInputStream is = fs.open(path);
	final FileStatus stat = fs.getFileStatus(path);
	final byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
	is.readFully(0, buffer);
	is.close();
	fs.close();
	return buffer;
} else {
	throw new Exception("the file is not found .");
}

在高并发情况下会报错:

java.io.IOException: Failed on local exception: java.io.InterruptedIOException: Interrupted while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.16.3.2:52305 remote=/10.16.3.2:59000]. 60000 millis timeout left.; Host Details : local host is: "hadoop-test/10.16.3.2"; destination host is: "hadoop-alone-test":59000; 
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy109.getListing(Unknown Source)

问题原因为:多线程访问问题,线程A、B同时获取filesystem后使用,线程B使用完后调用了filesystem.close()方法,这个时候线程A还在操作filesystem,所以报错上面种种异常

二、解决办法

禁用FileSystem缓存

Configuration conf = new Configuration();
conf.set("fs.hdfs.impl.disable.cache", "true");

 三、问题原因

FileSystem.get源码分析

那么明明使用了两个集群,为什么会使用到Cache呢,分析FileSystem.get源码便知道原因了

  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }

    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      LOGGER.debug("Bypassing cache to create filesystem {}", uri);
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

应用在获取FileSystem时,提供了完整的hdfs目录,同时没有设置fs.hdfs.impl.disable.cache为true,所以创建slave集群的filesystem对象时,会使用CACHE.get(uri, conf)获取,Cache内部使用一个HashMap来维护filesystem对象,很容易想到,当HashMap的key相同时,便返回了同一个filesystem对象,那么Cache中的key是什么样的呢,代码如下:

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }

 static class Key {
      final String scheme;
      final String authority;
      final UserGroupInformation ugi;
      final long unique;   // an artificial way to make a key unique

      Key(URI uri, Configuration conf) throws IOException {
        this(uri, conf, 0);
      }

      Key(URI uri, Configuration conf, long unique) throws IOException {
        scheme = uri.getScheme()==null ?
            "" : StringUtils.toLowerCase(uri.getScheme());
        authority = uri.getAuthority()==null ?
            "" : StringUtils.toLowerCase(uri.getAuthority());
        this.unique = unique;

        this.ugi = UserGroupInformation.getCurrentUser();
      }

      @Override
      public int hashCode() {
        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
      }

      static boolean isEqual(Object a, Object b) {
        return a == b || (a != null && a.equals(b));
      }

      @Override
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj instanceof Key) {
          Key that = (Key)obj;
          return isEqual(this.scheme, that.scheme)
                 && isEqual(this.authority, that.authority)
                 && isEqual(this.ugi, that.ugi)
                 && (this.unique == that.unique);
        }
        return false;
      }

      @Override
      public String toString() {
        return "("+ugi.toString() + ")@" + scheme + "://" + authority;
      }
    }
  }

可以看到Key由四个要素构成,其中前2个跟URI相关,我们使用的为一个hdfs://nameservice1,ugi为安全认证的用户,使用的是同一个,unique为0,因此Key相同,第二次获取filesystem对象时,直接返回了第一次创建的filesystem对象,最终造成了应用虽然使用了不同的集群配置文件,但最中获取的是同一个filesystem对象。

解决

fs.hdfs.impl.disable.cache参数本身不建议修改,修改集群的fs.defaultFS,使不同集群的fs.defaultFS不一样

参考:

多个HDFS集群的fs.defaultFS配置一样,造成应用一直连接同一个集群的问题分析 - 远去的列车 - 博客园
Filesystem closed错误排查 - 简书
 

java.io.IOException: Filesystem closed - 简书

java.io.IOException: Filesystem closed_bitcarmanlee的博客-CSDN博客_filesystem closed

最后

以上就是靓丽灯泡为你收集整理的Filesystem closed报错问题处理的全部内容,希望文章能够帮你解决Filesystem closed报错问题处理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部