我是靠谱客的博主 威武睫毛,最近开发中收集的这篇文章主要介绍presto查询hudi异常解决,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

记录一个presto查询hudi的异常的解决办法,本人目前对presto还不是很熟,只是在使用过程中遇到了问题,记录一下异常解决方法以及过程

异常

2021-12-22T17:29:55.440+0800    ERROR   SplitRunner-101-126     com.facebook.presto.execution.executor.TaskExecutor     Error processing Split 20211222_092954_00047_8xk77.1.0.0-0 {path=hdfs://cluster1/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet, start=0, length=435116, fileSize=435116, fileModifiedTime=1640165281695, hosts=[], database=test_dkl, table=test_hudi_1, nodeSelectionStrategy=NO_PREFERENCE, partitionName=<UNPARTITIONED>, s3SelectPushdownEnabled=false, cacheQuotaRequirement=CacheQuotaRequirement{cacheQuotaScope=GLOBAL, quota=Optional.empty}} (start = 9.271133085962032E9, wall = 49 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://cluster1/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
        at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
        at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:92)
        at org.apache.hudi.hadoop.HoodieParquetInputFormat.getRecordReader(HoodieParquetInputFormat.java:216)
        at com.facebook.presto.hive.HiveUtil.createRecordReader(HiveUtil.java:266)
        at com.facebook.presto.hive.GenericHiveRecordCursorProvider.lambda$createRecordCursor$0(GenericHiveRecordCursorProvider.java:74)
        at com.facebook.presto.hive.authentication.UserGroupInformationUtils.lambda$executeActionInDoAs$0(UserGroupInformationUtils.java:29)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:360)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1726)
        at com.facebook.presto.hive.authentication.UserGroupInformationUtils.executeActionInDoAs(UserGroupInformationUtils.java:27)
        at com.facebook.presto.hive.authentication.DirectHdfsAuthentication.doAs(DirectHdfsAuthentication.java:38)
        at com.facebook.presto.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:81)
        at com.facebook.presto.hive.GenericHiveRecordCursorProvider.createRecordCursor(GenericHiveRecordCursorProvider.java:73)
        at com.facebook.presto.hive.HivePageSourceProvider.createHivePageSource(HivePageSourceProvider.java:478)
        at com.facebook.presto.hive.HivePageSourceProvider.createPageSource(HivePageSourceProvider.java:184)
        at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:63)
        at com.facebook.presto.split.PageSourceManager.createPageSource(PageSourceManager.java:80)
        at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:249)
        at com.facebook.presto.operator.Driver.processInternal(Driver.java:424)
        at com.facebook.presto.operator.Driver.lambda$processFor$9(Driver.java:307)
        at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:728)
        at com.facebook.presto.operator.Driver.processFor(Driver.java:300)
        at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
        at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
        at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:599)
        at com.facebook.presto.$gen.Presto_0_265_1_ad1fce6____20211222_010139_1.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: readDirect unsupported in RemoteBlockReader
        at org.apache.hadoop.hdfs.RemoteBlockReader.read(RemoteBlockReader.java:492)
        at org.apache.hadoop.hdfs.DFSInputStream$ByteBufferStrategy.doRead(DFSInputStream.java:789)
        at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:823)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:883)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:938)
        at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:143)
        at org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:81)
        at org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:90)
        at org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:75)
        at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
        at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)

异常原因

原因是因为hdfs block块没有一个和presto在一个节点上的,要复现这个问题,可以把hdfs副本数改为1,这样可以比较快的复现,关于如何修改hdfs副本数:

修改hdfs-site.xml:

    <property>
      <name>dfs.replication</name>
      <value>1</value>
    </property>

注意:只修改集群的配置,只是修改了集群的默认值,还要注意客户端里的hdfs-site.xml也要修改,如果有的话

关于如何查看hdfs block块分布在哪几个节点:

hdfs fsck  hdfs://cluster1/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet  -files -blocks -locations

Connecting to namenode via http://indata-192-168-44-163.indata.com:50070/fsck?ugi=hive&files=1&blocks=1&locations=1&path=%2Fwarehouse%2Ftablespace%2Fmanaged%2Fhive%2Ftest_dkl.db%2Ftest_hudi_1%2F069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet
FSCK started by hive (auth:KERBEROS_SSL) from /192.168.44.162 for path /warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet at Thu Dec 23 11:01:40 CST 2021
/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet 435116 bytes, replicated: replication=1, 1 block(s):  OK
0. BP-551234808-192.168.44.164-1628073819744:blk_1073859319_119251 len=435116 Live_repl=1  [DatanodeInfoWithStorage[192.168.44.163:1019,DS-50d15bb1-a076-4e5f-a75a-f729f2bb8db6,DISK]]


Status: HEALTHY
 Number of data-nodes:  3
 Number of racks:               1
 Total dirs:                    0
 Total symlinks:                0

Replicated Blocks:
 Total size:    435116 B
 Total files:   1
 Total blocks (validated):      1 (avg. block size 435116 B)
 Minimally replicated blocks:   1 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       0 (0.0 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    1
 Average block replication:     1.0
 Missing blocks:                0
 Corrupt blocks:                0
 Missing replicas:              0 (0.0 %)

Erasure Coded Block Groups:
 Total size:    0 B
 Total files:   0
 Total block groups (validated):        0
 Minimally erasure-coded block groups:  0
 Over-erasure-coded block groups:       0
 Under-erasure-coded block groups:      0
 Unsatisfactory placement block groups: 0
 Average block group size:      0.0
 Missing block groups:          0
 Corrupt block groups:          0
 Missing internal blocks:       0
FSCK ended at Thu Dec 23 11:01:40 CST 2021 in 1 milliseconds


The filesystem under path '/warehouse/tablespace/managed/hive/test_dkl.db/test_hudi_1/069eddc2-f3bf-4efc-911b-3cb3aa523a8e-0_0-0-0_20211222172800.parquet' is HEALTHY

可以看到该文件只有一个副本在163,而presto实际在162

异常解决办法

修改presto catalog下面的配置文件hdfs-site.xml,添加

    <property>
      <name>dfs.client.use.legacy.blockreader</name>
      <value>false</value>
    </property>

重启presto服务,即可解决问题

异常解决定位过程

网上没有搜到该异常的解决办法,只有一个异常与此类似,不过是hive/spark sql之间的异常:https://dongkelun.com/2018/05/20/hiveQueryException/。
怀疑是jar包版本问题,通过看源码解决,部分源码(可根据异常信息定位源码位置)

RemoteBlockReader.read

  @Override
  public int read(ByteBuffer buf) throws IOException {
    throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
  }

发现该方法直接抛异常,并且RemoteBlockReader已经弃用了,在同一个包下面有一个RemoteBlockReader2,并且它的read方法可以使用:

  @Override
  public int read(ByteBuffer buf) throws IOException {
    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
      TraceScope scope = Trace.startSpan(
          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
      try {
        readNextPacket();
      } finally {
        scope.close();
      }
    }
    if (curDataSlice.remaining() == 0) {
      // we're at EOF now
      return -1;
    }

    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
    ByteBuffer writeSlice = curDataSlice.duplicate();
    writeSlice.limit(writeSlice.position() + nRead);
    buf.put(writeSlice);
    curDataSlice.position(writeSlice.position());

    return nRead;
  }

那么能不能通过分析源码,让它走RemoteBlockReader2的逻辑呢,我们看一下异常的上一个调用

DFSInputStream$ByteBufferStrategy.doRead

    @Override
    public int doRead(BlockReader blockReader, int off, int len)
        throws ChecksumException, IOException {
      int oldpos = buf.position();
      int oldlimit = buf.limit();
      boolean success = false;
      try {
        int ret = blockReader.read(buf);
        success = true;
        updateReadStatistics(readStatistics, ret, blockReader);
        return ret;
      } finally {
        if (!success) {
          // Reset to original state so that retries work correctly.
          buf.position(oldpos);
          buf.limit(oldlimit);
        }
      } 
    }

查看blockReader的初始化:

blockReader = new BlockReaderFactory(dfsClient.getConf()).
            setInetSocketAddress(targetAddr).
            setRemotePeerFactory(dfsClient).
            setDatanodeInfo(chosenNode).
            setStorageType(storageType).
            setFileName(src).
            setBlock(blk).
            setBlockToken(accessToken).
            setStartOffset(offsetIntoBlock).
            setVerifyChecksum(verifyChecksum).
            setClientName(dfsClient.clientName).
            setLength(blk.getNumBytes() - offsetIntoBlock).
            setCachingStrategy(curCachingStrategy).
            setAllowShortCircuitLocalReads(!shortCircuitForbidden).
            setClientCacheContext(dfsClient.getClientContext()).
            setUserGroupInformation(dfsClient.ugi).
            setConfiguration(dfsClient.getConfiguration()).
            build();

再看BlockReaderFactory.build

  public BlockReader build() throws IOException {
    BlockReader reader = null;

    Preconditions.checkNotNull(configuration);
    if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
      if (clientContext.getUseLegacyBlockReaderLocal()) {
        reader = getLegacyBlockReaderLocal();
        if (reader != null) {
          if (LOG.isTraceEnabled()) {
            LOG.trace(this + ": returning new legacy block reader local.");
          }
          return reader;
        }
      } else {
        reader = getBlockReaderLocal();
        if (reader != null) {
          if (LOG.isTraceEnabled()) {
            LOG.trace(this + ": returning new block reader local.");
          }
          return reader;
        }
      }
    }
    if (conf.domainSocketDataTraffic) {
      reader = getRemoteBlockReaderFromDomain();
      if (reader != null) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": returning new remote block reader using " +
              "UNIX domain socket on " + pathInfo.getPath());
        }
        return reader;
      }
    }
    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
        "TCP reads were disabled for testing, but we failed to " +
        "do a non-TCP read.");
    return getRemoteBlockReaderFromTcp();
  }

我们发现不管是getRemoteBlockReaderFromDomain还是getRemoteBlockReaderFromTcp,都是调用getRemoteBlockReader:

    if (conf.useLegacyBlockReader) {
      return RemoteBlockReader.newBlockReader(fileName,
          block, token, startOffset, length, conf.ioBufferSize,
          verifyChecksum, clientName, peer, datanode,
          clientContext.getPeerCache(), cachingStrategy);
    } else {
      return RemoteBlockReader2.newBlockReader(
          fileName, block, token, startOffset, length,
          verifyChecksum, clientName, peer, datanode,
          clientContext.getPeerCache(), cachingStrategy);
    }

可以看到,当conf.useLegacyBlockReader为false时,就会走到RemoteBlockReader2,那么再看一下conf.useLegacyBlockReader

useLegacyBlockReader = conf.getBoolean(
          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);

DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT为false,那么DFS_CLIENT_USE_LEGACY_BLOCKREADER默认的应该为true,我们不用深究DFS_CLIENT_USE_LEGACY_BLOCKREADER为啥为true,我们看看能不能通过修改配置将他的值设置为false

  public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";

我们通过在hdfs-site.xml添加配置dfs.client.use.legacy.blockreader = false,重启presto,发现解决了问题,大功告成!

最后

以上就是威武睫毛为你收集整理的presto查询hudi异常解决的全部内容,希望文章能够帮你解决presto查询hudi异常解决所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部