我是靠谱客的博主 震动河马,最近开发中收集的这篇文章主要介绍druid.io index_realtime 实时查询,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

前面一些文章介绍了实时任务流程和一些阶段细节,还没有说是如何保证实时查询的。首先能脱口而出的就是:

  1. 实时任务即负责持久化,也要响应查询
  2. 实时任务既要用jvm堆内存,也要使用堆外内存

所以本文将探讨下实时任务是怎么做到的,或者换句话说:你怎么设计一个实时的数据导入且可查询的东西?

来个概述图

有个druid.io基础,然后体会下大概流程即可,后续走读源码
在这里插入图片描述

源码走读

回到druid.io源码,peon进程的RealPlumber,io.druid.segment.realtime.plumber.RealtimePlumber#add接收了每一个数据,源码如下

@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
  log.info("add row:" + row.toString());
  long messageTimestamp = row.getTimestampFromEpoch();
  final Sink sink = getSink(messageTimestamp);
  metrics.reportMessageMaxTimestamp(messageTimestamp);
  if (sink == null) {
    return -1;
  }

  final int numRows = sink.add(row, false);
  // 持久化
  if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
    persist(committerSupplier.get());
  }

  return numRows;
}

如下图log, 我在实际运行中加了自己的一些log
在这里插入图片描述

其中Sink对象比较重要,io.druid.segment.realtime.plumber.RealtimePlumber#getSink如下

 private Sink getSink(long timestamp)
  {
    if (!rejectionPolicy.accept(timestamp)) {
      return null;
    }

    final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
    final VersioningPolicy versioningPolicy = config.getVersioningPolicy();

    DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(timestamp));
    final long truncatedTime = truncatedDateTime.getMillis();

    Sink retVal = sinks.get(truncatedTime);

    if (retVal == null) {
      final Interval sinkInterval = new Interval(
          truncatedDateTime,
          segmentGranularity.increment(truncatedDateTime)
      );
      log.info("RealtimePlumber.getSink inteval:" + sinkInterval.toString());
      retVal = new Sink(
          sinkInterval,
          schema,
          config.getShardSpec(),
          versioningPolicy.getVersion(sinkInterval),
          config.getMaxRowsInMemory(),
          config.isReportParseExceptions()
      );
      addSink(retVal);

    }

    return retVal;
  }
  1. 从一个Map<Long, Sink> sinks中获取,如果没有则构造并加入map中,下次就不用new Sink了
  2. 一般tranquility就是小时级别任务,所以sinks一般就一个键值对
  3. 对每一个数据都是sink.add(row, false);, 且都要判断是否要进行持久化操作
  4. 同时io.druid.segment.realtime.plumber.RealtimePlumber#addSink方法中发布了临时的segment到zookeeper中

可以看到其实Sink这个类俨然就是核心了,数据处理和查询都与这个类脱不了干系

顺这persist方法看下去,源码如下(代码中加上了注释)

  • io.druid.segment.realtime.plumber.RealtimePlumber#persist
@Override
  public void persist(final Committer committer)
  {
    final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
    for (Sink sink : sinks.values()) {
      if (sink.swappable()) {
        // currHydrant 添加到 indexesToPersist, 并创建新的 FireHydrant 设置成 currHydrant
        indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval()));
      }
    }

    log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());

    final Stopwatch runExecStopwatch = Stopwatch.createStarted();
    final Stopwatch persistStopwatch = Stopwatch.createStarted();

    final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
                                              ImmutableMap.of(
                                                  COMMIT_METADATA_KEY,
                                                  committer.getMetadata(),
                                                  COMMIT_METADATA_TIMESTAMP_KEY,
                                                  System.currentTimeMillis()
                                              );

    persistExecutor.execute(
        new ThreadRenamingRunnable(StringUtils.format("%s-incremental-persist", schema.getDataSource()))
        {
          @Override
          public void doRun()
          {
            /* Note:
            If plumber crashes after storing a subset of all the hydrants then we will lose data and next
            time we will start with the commitMetadata stored in those hydrants.
            option#1:
            maybe it makes sense to store the metadata outside the segments in a separate file. This is because the
            commit metadata isn't really associated with an individual segment-- it's associated with a set of segments
            that are persisted at the same time or maybe whole datasource. So storing it in the segments is asking for problems.
            Sort of like this:

            {
              "metadata" : {"foo": "bar"},
              "segments": [
                {"id": "datasource_2000_2001_2000_1", "hydrant": 10},
                {"id": "datasource_2001_2002_2001_1", "hydrant": 12},
              ]
            }
            When a realtime node crashes and starts back up, it would delete any hydrants numbered higher than the
            ones in the commit file.

            option#2
            We could also just include the set of segments for the same chunk of metadata in more metadata on each
            of the segments. we might also have to think about the hand-off in terms of the full set of segments being
            handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing
            off and the others fail, the real-time would believe that it needs to re-ingest the data).
             */
            long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime();
            try {
              // 遍历 indexesToPersist 并进行持久化操作,返回持久化的行数作为指标进行收集
              for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
                metrics.incrementRowOutputCount(
                    persistHydrant(
                        pair.lhs, schema, pair.rhs, metadataElems
                    )
                );
              }
              committer.run();
            }
            catch (Exception e) {
              metrics.incrementFailedPersists();
              throw e;
            }
            finally {
              metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime);
              metrics.incrementNumPersists();
              metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
              persistStopwatch.stop();
            }
          }
        }
    );

    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
    metrics.incrementPersistBackPressureMillis(startDelay);
    if (startDelay > WARN_DELAY) {
      log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
    }
    runExecStopwatch.stop();
    // 重新设置 nextFlush
    resetNextFlush();
  }
  • io.druid.segment.realtime.plumber.RealtimePlumber#persistHydrant
/**
  * Persists the given hydrant and returns the number of rows persisted
  *
  * @param indexToPersist hydrant to persist
  * @param schema         datasource schema
  * @param interval       interval to persist
  *
  * @return the number of rows persisted
  */
 protected int persistHydrant(
     FireHydrant indexToPersist,
     DataSchema schema,
     Interval interval,
     Map<String, Object> metadataElems
 )
 {
   synchronized (indexToPersist) {
     if (indexToPersist.hasSwapped()) {
       log.info(
           "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
           schema.getDataSource(), interval, indexToPersist
       );
       return 0;
     }

     log.info(
         "DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]",
         schema.getDataSource(),
         interval,
         metadataElems,
         indexToPersist
     );
     try {
       int numRows = indexToPersist.getIndex().size();

       final IndexSpec indexSpec = config.getIndexSpec();

       indexToPersist.getIndex().getMetadata().putAll(metadataElems);
       /**
        * indexMerger:IndexMergerV9,持久化会在任务配置的basePersistDirectory下生成segment文件信息
        * 这里会合并之前的索引文件:io.druid.segment.IndexMergerV9#merge
        */
       final File persistedFile = indexMerger.persist(
           indexToPersist.getIndex(),
           interval,
           new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
           indexSpec,
           config.getSegmentWriteOutMediumFactory()
       );
       /**
        * indexToPersist要保证可查,构造新的可查询的segment:即QueryableIndexSegment(用新的persistedFile去构造)
        */
       indexToPersist.swapSegment(
           new QueryableIndexSegment(
               indexToPersist.getSegmentIdentifier(),
               indexIO.loadIndex(persistedFile)
           )
       );
       return numRows;
     }
     catch (IOException e) {
       log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
          .addData("interval", interval)
          .addData("count", indexToPersist.getCount())
          .emit();

       throw Throwables.propagate(e);
     }
   }
 }

到这里其实可以解答查询的jvm堆内/外内存的使用了

  1. jvm堆内有个currHydrant维持数据
  2. 堆外是小的segment文件merge后的persistedFile,mmap加载到堆外内存的

显然1,2 一起再merge处理就能构成结果返回了,查询即如此;这里核心类就是FireHydrant了,而FireHydrant的核心必然是IncrementalIndex(OnheapIncrementalIndex & OffheapIncrementalIndex)


在druid.io查询中,使用了QueryRunner这种接口形式处理,而基于此又封装了QuerySegmentWalker接口用来实现对给定Query对象的查询

/**
 */
public interface QuerySegmentWalker
{
  /**
   * Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s)
   * such that it represents the interval.
   *
   * @param <T> query result type
   * @param query the query to find a Queryable for
   * @param intervals the intervals to find a Queryable for
   * @return a Queryable object that represents the interval
   */
  <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals);

  /**
   * Gets the Queryable for a given list of SegmentSpecs.
   *
   * @param <T> the query result type
   * @param query the query to return a Queryable for
   * @param specs the list of SegmentSpecs to find a Queryable for
   * @return the Queryable object with the given SegmentSpecs
   */
  <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs);
}

实现类如下,看起来有与实时任务相关的

在这里插入图片描述

实际加了log,也正是这些个,如下

io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker#getQueryRunnerForIntervals

在这里插入图片描述
查询log如下
在这里插入图片描述
顺着代码走,可以看到如下 io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker#getQueryRunnerForSegments

return new SpecificSegmentQueryRunner<>(
    withPerSinkMetrics(
        new BySegmentQueryRunner<>(
            sinkSegmentIdentifier,
            descriptor.getInterval().getStart(),
            factory.mergeRunners(
                MoreExecutors.sameThreadExecutor(),
                Iterables.transform(
                    theSink,
                    new Function<FireHydrant, QueryRunner<T>>()
                    {
                      @Override
                      public QueryRunner<T> apply(final FireHydrant hydrant)
                      {
                        // Hydrant might swap at any point, but if it's swapped at the start
                        // then we know it's *definitely* swapped.
                        final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();

                        if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
                          return new NoopQueryRunner<>();
                        }

                        // Prevent the underlying segment from swapping when its being iterated
                        final Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
                        log.info("hydrant.getAndIncrementSegment():" + hydrant);
                        try {
                          QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
                              factory.createRunner(segment.lhs),
                              segment.rhs
                          );

                          // 1) Only use caching if data is immutable
                          // 2) Hydrants are not the same between replicas, make sure cache is local
                          if (hydrantDefinitelySwapped && cache.isLocal()) {
                            return new CachingQueryRunner<>(
                                makeHydrantCacheIdentifier(hydrant),
                                descriptor,
                                objectMapper,
                                cache,
                                toolChest,
                                baseRunner,
                                MoreExecutors.sameThreadExecutor(),
                                cacheConfig
                            );
                          } else {
                            return baseRunner;
                          }
                        }
                        catch (RuntimeException e) {
                          CloseQuietly.close(segment.rhs);
                          throw e;
                        }
                      }
                    }
                )
            )
        ),
        toolChest,
        sinkSegmentIdentifier,
        cpuTimeAccumulator
    ),
    new SpecificSegmentSpec(descriptor)
);

有了hydrant就能构造QueryRunner,当然最后就能查询了

总结

本文介绍了实时任务查询的核心,没有太具体展开,后续TODO

最后

以上就是震动河马为你收集整理的druid.io index_realtime 实时查询的全部内容,希望文章能够帮你解决druid.io index_realtime 实时查询所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部