我是靠谱客的博主 潇洒宝贝,最近开发中收集的这篇文章主要介绍org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException问题问题分析解决方案,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

问题

SparkStreaming实时消费kafka数据,数据聚合后重分区,每个分区内部,写hive表;写文件的过程中,文件名具体到批次时间。通过executor的日志,发现偶尔会有下面的报错

21/01/06 13:44:00 WARN DFSClient: DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/realTimeCal/xxxxxxx/temp (inode 663169524): File does not exist. [Lease.
Holder: DFSClient_NONMAPREDUCE_2012746095_1, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3428)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1459)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1255)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
21/01/06 13:44:00 WARN CheckpointWriter: Error in attempt 1 of writing checkpoint to 'hdfs://ns1/tmp/realTimeCal/xxxxxxx/checkpoint-1609911840000'
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/realTimeCal/LiveECommerceOrder/temp (inode 663169524): File does not exist. [Lease.
Holder: DFSClient_NONMAPREDUCE_2012746095_1, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3428)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3233)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3071)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1459)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1255)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
21/01/06 13:44:00 INFO CheckpointWriter: Saving checkpoint for time 1609911840000 ms to file 'hdfs://ns1/tmp/realTimeCal/xxxxxxx/checkpoint-1609911840000'
Exception in thread "pool-25-thread-74" java.lang.NullPointerException
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

问题分析

操作文件时,文件名具体到批次信息;同一个批次数据重分区为多个问题,多个task同时操作一个批次的文件,即一个问题,在后面的task操作文件是,可能其他的task已经把文件删除,后面的task找不到该文件,导致报错;

def saveSqlData2Hive(time: Time, schemaJson: String, content: List[Map[String, Any]]): Unit = {
//kafka数据落hive
val dt = new DateTime(time.milliseconds).toString("yyyyMMdd")
val dataPathWithPartition = s"${sqlDataPath}/dt=${dt}"
if (!HDFSOption.isExists(dataPathWithPartition)) {
HiveOption.addPartition(sqlLogTableName, dt, dataPathWithPartition)
}
HDFSOption.writeToFile(dataPathWithPartition, s"data.avro.${time.milliseconds.toString}", schemaJson, content)
}

解决方案

在操作文件时,将分区partitionId拼接到文件名后面,一个分区的文件只有一个task操作,避免上面的错误;

def saveSqlData2Hive(time: Time, schemaJson: String, content: List[Map[String, Any]], partitionId:String): Unit = {
//kafka数据落hive
val dt = new DateTime(time.milliseconds).toString("yyyyMMdd")
val dataPathWithPartition = s"${sqlDataPath}/dt=${dt}"
if (!HDFSOption.isExists(dataPathWithPartition)) {
HiveOption.addPartition(sqlLogTableName, dt, dataPathWithPartition)
}
HDFSOption.writeToFile(dataPathWithPartition, s"data.avro.${time.milliseconds.toString}.${partitionId}", schemaJson, content)
}

修改完成后,重新启动程序,问题解决

最后

以上就是潇洒宝贝为你收集整理的org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException问题问题分析解决方案的全部内容,希望文章能够帮你解决org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException问题问题分析解决方案所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部