我是靠谱客的博主 虚幻皮皮虾,最近开发中收集的这篇文章主要介绍spark报错:java.io.IOException: Filesystem closed,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.问题描述

往集群提交任务的时候,需要在hdfs上面读取一个资源文件。在读取该资源文件的时候,代码报错出如下:

2021-01-29 09:48:29,023 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
	at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:490)
	at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:625)
	at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:607)
	at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:148)
	at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:147)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:147)
	at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:196)
	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:113)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:100)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:95)
2021-01-29 09:48:29,037 ERROR util.Utils: Uncaught exception in thread pool-1-thread-1
java.io.IOException: Filesystem closed
	at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:490)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1734)
	at org.apache.hadoop.hdfs.DistributedFileSystem$31.doCall(DistributedFileSystem.java:1684)
	at org.apache.hadoop.hdfs.DistributedFileSystem$31.doCall(DistributedFileSystem.java:1681)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1696)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1758)
	at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:245)
	at org.apache.spark.SparkContext$$anonfun$stop$8$$anonfun$apply$mcV$sp$5.apply(SparkContext.scala:1944)
	at org.apache.spark.SparkContext$$anonfun$stop$8$$anonfun$apply$mcV$sp$5.apply(SparkContext.scala:1944)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1944)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1943)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:587)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:238)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:210)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:210)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:210)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1997)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:210)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:210)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:210)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:210)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:181)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

2.调用拷贝hdfs文件到本地的代码

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.{Level, Logger}

object EastDataReport {
  private val LOG: slf4j.Logger = LoggerFactory.getLogger(EastDataReport.getClass)

//将hdfs文件拷贝到本地的函数
def copyHdfsDataToLocal(inHdfsPath:String,inLocalPath:String) ={
    var status=0
    try {
      val conf = new Configuration()
      // conf.setBoolean("fs.hdfs.impl.disable.cache", true)  
      conf.set("fs.default.name", "hdfs://xxx.hadoop:8020")
      val hdfsPath = new Path(inHdfsPath)
      val fs = hdfsPath.getFileSystem(conf)
      val localPath = new Path(inLocalPath)
      fs.copyToLocalFile(hdfsPath, localPath)
      fs.close()
      status=0
    }catch {
      case e: Exception => {
        e.printStackTrace()
        LOG.error(s"Failed , error msg ${e.getMessage()}")
        status=1
      }
    }
    finally {
      status
    }
  }

def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(Level.ERROR)
    Logger.getLogger(EastDataReport.getClass.getName).setLevel(Level.INFO)

      val outputHdfsPath=f"hdfs://xxx/report/data_detail/${dt}_${file_suffix}"
      println("outputHdfsPath: ",outputHdfsPath)
      println("将写入到hdfs上的csv拷贝到本地")
      val localpath="file:///home/work/xxx/project/report/data_detail/tmp/"
      val copyStatus=copyHdfsDataToLocal(outputHdfsPath,localpath)
      if (copyStatus.equals(0)) {
        println("将写入到hdfs上的csv拷贝到本地——成功")
      }else {
          println("将写入到hdfs上的csv拷贝到本地——失败")
      }
 }
}

3.问题的原因分析

一般读取hdfs上文件的api是这样写的:

public void initPackageNameTypeMap {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path file = new Path("filePath");
  • 在spark集群中执行 insert 语句时报错,堆栈信息为:FileSystem closed。常常出现在ThriftServer里面。
  • 当任务提交到集群上面以后,多个datanode在getFileSystem过程中,由于Configuration一样,会得到同一个FileSystem。
  • 由于hadoop FileSystem.get 获得的FileSystem会从缓存加载,如果多线程中的一个线程closed,即如果有一个datanode在使用完关闭连接,其它的datanode在访问就会出现上述异常。

   4.解决方案

  • 1).修改配置文件:

  hdfs存在不从缓存加载的解决方式,在hdfs-site.xml 配置 fs.hdfs.impl.disable.cache=true即可

  • 2).代码配置如下:

  conf.setBoolean("fs.hdfs.impl.disable.cache", true) 

         FileSytem类内部有一个static CACHE,用来保存每种文件系统的实例集合,FileSystem类中可以通过"fs.%s.impl.disable.cache"来指定是否缓存FileSystem实例(其中%s替换为相应的scheme,比如hdfs、local、s3、s3n等),即一旦创建了相应的FileSystem实例,这个实例将会保存在缓存中,此后每次get都会获取同一个实例。所以设为true以后,就能解决上面的异常。

参考链接:
http://stackoverflow.com/questions/23779186/ioexception-filesystem-closed-exception-when-running-oozie-workflow
http://stackoverflow.com/questions/20057881/hadoop-filesystem-closed-exception-when-doing-bufferedreader-close
http://shift-alt-ctrl.iteye.com/blog/2108760

 

最后

以上就是虚幻皮皮虾为你收集整理的spark报错:java.io.IOException: Filesystem closed的全部内容,希望文章能够帮你解决spark报错:java.io.IOException: Filesystem closed所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部