概述
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun
前言
记录一个presto查询hudi的异常的解决办法,本人目前对presto还不是很熟,只是在使用过程中遇到了问题,记录一下异常解决方法以及过程
异常
2021-12-22T17:29:55.440+0800 ERROR SplitRunner-101-126 com.facebook.presto.execution.executor.TaskExecutor Error processing Split 20211222_092954_00047_8xk77.1.0.0-0 {path=hdfs://cluster1/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet, start=0, length=435116, fileSize=435116, fileModifiedTime=1640165281695, hosts=[], database=test_dkl, table=test_hudi_1, nodeSelectionStrategy=NO_PREFERENCE, partitionName=<UNPARTITIONED>, s3SelectPushdownEnabled=false, cacheQuotaRequirement=CacheQuotaRequirement{cacheQuotaScope=GLOBAL, quota=Optional.empty}} (start = 9.271133085962032E9, wall = 49 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://cluster1/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:92)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReader(HoodieParquetInputFormat.java:216)
at com.facebook.presto.hive.HiveUtil.createRecordReader(HiveUtil.java:266)
at com.facebook.presto.hive.GenericHiveRecordCursorProvider.lambda$createRecordCursor$0(GenericHiveRecordCursorProvider.java:74)
at com.facebook.presto.hive.authentication.UserGroupInformationUtils.lambda$executeActionInDoAs$0(UserGroupInformationUtils.java:29)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1726)
at com.facebook.presto.hive.authentication.UserGroupInformationUtils.executeActionInDoAs(UserGroupInformationUtils.java:27)
at com.facebook.presto.hive.authentication.DirectHdfsAuthentication.doAs(DirectHdfsAuthentication.java:38)
at com.facebook.presto.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:81)
at com.facebook.presto.hive.GenericHiveRecordCursorProvider.createRecordCursor(GenericHiveRecordCursorProvider.java:73)
at com.facebook.presto.hive.HivePageSourceProvider.createHivePageSource(HivePageSourceProvider.java:478)
at com.facebook.presto.hive.HivePageSourceProvider.createPageSource(HivePageSourceProvider.java:184)
at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:63)
at com.facebook.presto.split.PageSourceManager.createPageSource(PageSourceManager.java:80)
at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:249)
at com.facebook.presto.operator.Driver.processInternal(Driver.java:424)
at com.facebook.presto.operator.Driver.lambda$processFor$9(Driver.java:307)
at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:728)
at com.facebook.presto.operator.Driver.processFor(Driver.java:300)
at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:599)
at com.facebook.presto.$gen.Presto_0_265_1_ad1fce6____20211222_010139_1.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: readDirect unsupported in RemoteBlockReader
at org.apache.hadoop.hdfs.RemoteBlockReader.read(RemoteBlockReader.java:492)
at org.apache.hadoop.hdfs.DFSInputStream$ByteBufferStrategy.doRead(DFSInputStream.java:789)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:823)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:883)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:938)
at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:143)
at org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:81)
at org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:90)
at org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:75)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
异常原因
原因是因为hdfs block块没有一个和presto在一个节点上的,要复现这个问题,可以把hdfs副本数改为1,这样可以比较快的复现,关于如何修改hdfs副本数:
修改hdfs-site.xml:
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
注意:只修改集群的配置,只是修改了集群的默认值,还要注意客户端里的hdfs-site.xml也要修改,如果有的话
关于如何查看hdfs block块分布在哪几个节点:
hdfs fsck hdfs://cluster1/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet -files -blocks -locations
Connecting to namenode via http://indata-192-168-44-163.indata.com:50070/fsck?ugi=hive&files=1&blocks=1&locations=1&path=%2Fwarehouse%2Ftablespace%2Fmanaged%2Fhive%2Ftest_dkl.db%2Ftest_hudi_1%2F069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet
FSCK started by hive (auth:KERBEROS_SSL) from /192.168.44.162 for path /warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet at Thu Dec 23 11:01:40 CST 2021
/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet 435116 bytes, replicated: replication=1, 1 block(s): OK
0. BP-551234808-192.168.44.164-1628073819744:blk_1073859319_119251 len=435116 Live_repl=1 [DatanodeInfoWithStorage[192.168.44.163:1019,DS-50d15bb1-a076-4e5f-a75a-f729f2bb8db6,DISK]]
Status: HEALTHY
Number of data-nodes: 3
Number of racks: 1
Total dirs: 0
Total symlinks: 0
Replicated Blocks:
Total size: 435116 B
Total files: 1
Total blocks (validated): 1 (avg. block size 435116 B)
Minimally replicated blocks: 1 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 1
Average block replication: 1.0
Missing blocks: 0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Erasure Coded Block Groups:
Total size: 0 B
Total files: 0
Total block groups (validated): 0
Minimally erasure-coded block groups: 0
Over-erasure-coded block groups: 0
Under-erasure-coded block groups: 0
Unsatisfactory placement block groups: 0
Average block group size: 0.0
Missing block groups: 0
Corrupt block groups: 0
Missing internal blocks: 0
FSCK ended at Thu Dec 23 11:01:40 CST 2021 in 1 milliseconds
The filesystem under path '/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet' is HEALTHY
可以看到该文件只有一个副本在163,而presto实际在162
异常解决办法
修改presto catalog下面的配置文件hdfs-site.xml,添加
<property>
<name>dfs.client.use.legacy.blockreader</name>
<value>false</value>
</property>
重启presto服务,即可解决问题
异常解决定位过程
网上没有搜到该异常的解决办法,只有一个异常与此类似,不过是hive/spark sql之间的异常:https://dongkelun.com/2018/05/20/hiveQueryException/。
怀疑是jar包版本问题,通过看源码解决,部分源码(可根据异常信息定位源码位置)
RemoteBlockReader.read
@Override
public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
发现该方法直接抛异常,并且RemoteBlockReader
已经弃用了,在同一个包下面有一个RemoteBlockReader2
,并且它的read方法可以使用:
@Override
public int read(ByteBuffer buf) throws IOException {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
TraceScope scope = Trace.startSpan(
"RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
try {
readNextPacket();
} finally {
scope.close();
}
}
if (curDataSlice.remaining() == 0) {
// we're at EOF now
return -1;
}
int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
ByteBuffer writeSlice = curDataSlice.duplicate();
writeSlice.limit(writeSlice.position() + nRead);
buf.put(writeSlice);
curDataSlice.position(writeSlice.position());
return nRead;
}
那么能不能通过分析源码,让它走RemoteBlockReader2
的逻辑呢,我们看一下异常的上一个调用
DFSInputStream$ByteBufferStrategy.doRead
@Override
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException {
int oldpos = buf.position();
int oldlimit = buf.limit();
boolean success = false;
try {
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
return ret;
} finally {
if (!success) {
// Reset to original state so that retries work correctly.
buf.position(oldpos);
buf.limit(oldlimit);
}
}
}
查看blockReader的初始化:
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
再看BlockReaderFactory.build
public BlockReader build() throws IOException {
BlockReader reader = null;
Preconditions.checkNotNull(configuration);
if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal();
if (reader != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": returning new legacy block reader local.");
}
return reader;
}
} else {
reader = getBlockReaderLocal();
if (reader != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": returning new block reader local.");
}
return reader;
}
}
}
if (conf.domainSocketDataTraffic) {
reader = getRemoteBlockReaderFromDomain();
if (reader != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": returning new remote block reader using " +
"UNIX domain socket on " + pathInfo.getPath());
}
return reader;
}
}
Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
"TCP reads were disabled for testing, but we failed to " +
"do a non-TCP read.");
return getRemoteBlockReaderFromTcp();
}
我们发现不管是getRemoteBlockReaderFromDomain
还是getRemoteBlockReaderFromTcp
,都是调用getRemoteBlockReader
:
if (conf.useLegacyBlockReader) {
return RemoteBlockReader.newBlockReader(fileName,
block, token, startOffset, length, conf.ioBufferSize,
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy);
} else {
return RemoteBlockReader2.newBlockReader(
fileName, block, token, startOffset, length,
verifyChecksum, clientName, peer, datanode,
clientContext.getPeerCache(), cachingStrategy);
}
可以看到,当conf.useLegacyBlockReader
为false时,就会走到RemoteBlockReader2
,那么再看一下conf.useLegacyBlockReader
useLegacyBlockReader = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT
为false,那么DFS_CLIENT_USE_LEGACY_BLOCKREADER
默认的应该为true,我们不用深究DFS_CLIENT_USE_LEGACY_BLOCKREADER
为啥为true,我们看看能不能通过修改配置将他的值设置为false
public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
我们通过在hdfs-site.xml添加配置dfs.client.use.legacy.blockreader = false
,重启presto,发现解决了问题,大功告成!
最后
以上就是威武睫毛为你收集整理的presto查询hudi异常解决的全部内容,希望文章能够帮你解决presto查询hudi异常解决所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复