概述
1.问题描述
往集群提交任务的时候,需要在hdfs上面读取一个资源文件。在读取该资源文件的时候,代码报错出如下:
2021-01-29 09:48:29,023 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:490)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:625)
at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:607)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:148)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:147)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:147)
at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:196)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:113)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:100)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:95)
2021-01-29 09:48:29,037 ERROR util.Utils: Uncaught exception in thread pool-1-thread-1
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:490)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1734)
at org.apache.hadoop.hdfs.DistributedFileSystem$31.doCall(DistributedFileSystem.java:1684)
at org.apache.hadoop.hdfs.DistributedFileSystem$31.doCall(DistributedFileSystem.java:1681)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1696)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1758)
at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:245)
at org.apache.spark.SparkContext$$anonfun$stop$8$$anonfun$apply$mcV$sp$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$stop$8$$anonfun$apply$mcV$sp$5.apply(SparkContext.scala:1944)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1944)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1943)
at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:587)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:238)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:210)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:210)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:210)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1997)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:210)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:210)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:210)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:210)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:181)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2.调用拷贝hdfs文件到本地的代码
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.{Level, Logger}
object EastDataReport {
private val LOG: slf4j.Logger = LoggerFactory.getLogger(EastDataReport.getClass)
//将hdfs文件拷贝到本地的函数
def copyHdfsDataToLocal(inHdfsPath:String,inLocalPath:String) ={
var status=0
try {
val conf = new Configuration()
// conf.setBoolean("fs.hdfs.impl.disable.cache", true)
conf.set("fs.default.name", "hdfs://xxx.hadoop:8020")
val hdfsPath = new Path(inHdfsPath)
val fs = hdfsPath.getFileSystem(conf)
val localPath = new Path(inLocalPath)
fs.copyToLocalFile(hdfsPath, localPath)
fs.close()
status=0
}catch {
case e: Exception => {
e.printStackTrace()
LOG.error(s"Failed , error msg ${e.getMessage()}")
status=1
}
}
finally {
status
}
}
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
Logger.getLogger(EastDataReport.getClass.getName).setLevel(Level.INFO)
val outputHdfsPath=f"hdfs://xxx/report/data_detail/${dt}_${file_suffix}"
println("outputHdfsPath: ",outputHdfsPath)
println("将写入到hdfs上的csv拷贝到本地")
val localpath="file:///home/work/xxx/project/report/data_detail/tmp/"
val copyStatus=copyHdfsDataToLocal(outputHdfsPath,localpath)
if (copyStatus.equals(0)) {
println("将写入到hdfs上的csv拷贝到本地——成功")
}else {
println("将写入到hdfs上的csv拷贝到本地——失败")
}
}
}
3.问题的原因分析
一般读取hdfs上文件的api是这样写的:
public void initPackageNameTypeMap {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path file = new Path("filePath");
- 在spark集群中执行 insert 语句时报错,堆栈信息为:FileSystem closed。常常出现在ThriftServer里面。
- 当任务提交到集群上面以后,多个datanode在getFileSystem过程中,由于Configuration一样,会得到同一个FileSystem。
- 由于hadoop FileSystem.get 获得的FileSystem会从缓存加载,如果多线程中的一个线程closed,即如果有一个datanode在使用完关闭连接,其它的datanode在访问就会出现上述异常。
4.解决方案
- 1).修改配置文件:
hdfs存在不从缓存加载的解决方式,在hdfs-site.xml 配置 fs.hdfs.impl.disable.cache=true即可
- 2).代码配置如下:
conf.setBoolean("fs.hdfs.impl.disable.cache", true)
FileSytem类内部有一个static CACHE,用来保存每种文件系统的实例集合,FileSystem类中可以通过"fs.%s.impl.disable.cache"来指定是否缓存FileSystem实例(其中%s替换为相应的scheme,比如hdfs、local、s3、s3n等),即一旦创建了相应的FileSystem实例,这个实例将会保存在缓存中,此后每次get都会获取同一个实例。所以设为true以后,就能解决上面的异常。
参考链接:
http://stackoverflow.com/questions/23779186/ioexception-filesystem-closed-exception-when-running-oozie-workflow
http://stackoverflow.com/questions/20057881/hadoop-filesystem-closed-exception-when-doing-bufferedreader-close
http://shift-alt-ctrl.iteye.com/blog/2108760
最后
以上就是虚幻皮皮虾为你收集整理的spark报错:java.io.IOException: Filesystem closed的全部内容,希望文章能够帮你解决spark报错:java.io.IOException: Filesystem closed所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复