概述
前言
前面一些文章介绍了实时任务流程和一些阶段细节,还没有说是如何保证实时查询的。首先能脱口而出的就是:
- 实时任务即负责持久化,也要响应查询
- 实时任务既要用jvm堆内存,也要使用堆外内存
所以本文将探讨下实时任务是怎么做到的,或者换句话说:你怎么设计一个实时的数据导入且可查询的东西?
来个概述图
有个druid.io基础,然后体会下大概流程即可,后续走读源码
源码走读
回到druid.io源码,peon进程的RealPlumber
,io.druid.segment.realtime.plumber.RealtimePlumber#add
接收了每一个数据,源码如下
@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
log.info("add row:" + row.toString());
long messageTimestamp = row.getTimestampFromEpoch();
final Sink sink = getSink(messageTimestamp);
metrics.reportMessageMaxTimestamp(messageTimestamp);
if (sink == null) {
return -1;
}
final int numRows = sink.add(row, false);
// 持久化
if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {
persist(committerSupplier.get());
}
return numRows;
}
如下图log, 我在实际运行中加了自己的一些log
其中Sink对象比较重要,io.druid.segment.realtime.plumber.RealtimePlumber#getSink
如下
private Sink getSink(long timestamp)
{
if (!rejectionPolicy.accept(timestamp)) {
return null;
}
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final VersioningPolicy versioningPolicy = config.getVersioningPolicy();
DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(timestamp));
final long truncatedTime = truncatedDateTime.getMillis();
Sink retVal = sinks.get(truncatedTime);
if (retVal == null) {
final Interval sinkInterval = new Interval(
truncatedDateTime,
segmentGranularity.increment(truncatedDateTime)
);
log.info("RealtimePlumber.getSink inteval:" + sinkInterval.toString());
retVal = new Sink(
sinkInterval,
schema,
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
config.getMaxRowsInMemory(),
config.isReportParseExceptions()
);
addSink(retVal);
}
return retVal;
}
- 从一个
Map<Long, Sink> sinks
中获取,如果没有则构造并加入map中,下次就不用new Sink了 - 一般tranquility就是小时级别任务,所以sinks一般就一个键值对
- 对每一个数据都是
sink.add(row, false);
, 且都要判断是否要进行持久化操作 - 同时
io.druid.segment.realtime.plumber.RealtimePlumber#addSink
方法中发布了临时的segment到zookeeper中
可以看到其实Sink
这个类俨然就是核心了,数据处理和查询都与这个类脱不了干系
顺这persist方法看下去,源码如下(代码中加上了注释)
io.druid.segment.realtime.plumber.RealtimePlumber#persist
@Override
public void persist(final Committer committer)
{
final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
for (Sink sink : sinks.values()) {
if (sink.swappable()) {
// currHydrant 添加到 indexesToPersist, 并创建新的 FireHydrant 设置成 currHydrant
indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval()));
}
}
log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);
persistExecutor.execute(
new ThreadRenamingRunnable(StringUtils.format("%s-incremental-persist", schema.getDataSource()))
{
@Override
public void doRun()
{
/* Note:
If plumber crashes after storing a subset of all the hydrants then we will lose data and next
time we will start with the commitMetadata stored in those hydrants.
option#1:
maybe it makes sense to store the metadata outside the segments in a separate file. This is because the
commit metadata isn't really associated with an individual segment-- it's associated with a set of segments
that are persisted at the same time or maybe whole datasource. So storing it in the segments is asking for problems.
Sort of like this:
{
"metadata" : {"foo": "bar"},
"segments": [
{"id": "datasource_2000_2001_2000_1", "hydrant": 10},
{"id": "datasource_2001_2002_2001_1", "hydrant": 12},
]
}
When a realtime node crashes and starts back up, it would delete any hydrants numbered higher than the
ones in the commit file.
option#2
We could also just include the set of segments for the same chunk of metadata in more metadata on each
of the segments. we might also have to think about the hand-off in terms of the full set of segments being
handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing
off and the others fail, the real-time would believe that it needs to re-ingest the data).
*/
long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime();
try {
// 遍历 indexesToPersist 并进行持久化操作,返回持久化的行数作为指标进行收集
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(
persistHydrant(
pair.lhs, schema, pair.rhs, metadataElems
)
);
}
committer.run();
}
catch (Exception e) {
metrics.incrementFailedPersists();
throw e;
}
finally {
metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime);
metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
}
}
}
);
final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
metrics.incrementPersistBackPressureMillis(startDelay);
if (startDelay > WARN_DELAY) {
log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
}
runExecStopwatch.stop();
// 重新设置 nextFlush
resetNextFlush();
}
io.druid.segment.realtime.plumber.RealtimePlumber#persistHydrant
/**
* Persists the given hydrant and returns the number of rows persisted
*
* @param indexToPersist hydrant to persist
* @param schema datasource schema
* @param interval interval to persist
*
* @return the number of rows persisted
*/
protected int persistHydrant(
FireHydrant indexToPersist,
DataSchema schema,
Interval interval,
Map<String, Object> metadataElems
)
{
synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info(
"DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]",
schema.getDataSource(),
interval,
metadataElems,
indexToPersist
);
try {
int numRows = indexToPersist.getIndex().size();
final IndexSpec indexSpec = config.getIndexSpec();
indexToPersist.getIndex().getMetadata().putAll(metadataElems);
/**
* indexMerger:IndexMergerV9,持久化会在任务配置的basePersistDirectory下生成segment文件信息
* 这里会合并之前的索引文件:io.druid.segment.IndexMergerV9#merge
*/
final File persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
interval,
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
indexSpec,
config.getSegmentWriteOutMediumFactory()
);
/**
* indexToPersist要保证可查,构造新的可查询的segment:即QueryableIndexSegment(用新的persistedFile去构造)
*/
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegmentIdentifier(),
indexIO.loadIndex(persistedFile)
)
);
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
throw Throwables.propagate(e);
}
}
}
到这里其实可以解答查询的jvm堆内/外内存的使用了
- jvm堆内有个
currHydrant
维持数据 - 堆外是小的segment文件merge后的persistedFile,mmap加载到堆外内存的
显然1,2 一起再merge处理就能构成结果返回了,查询即如此;这里核心类就是FireHydrant
了,而FireHydrant
的核心必然是IncrementalIndex
(OnheapIncrementalIndex
& OffheapIncrementalIndex
)
在druid.io查询中,使用了QueryRunner这种接口形式处理,而基于此又封装了QuerySegmentWalker
接口用来实现对给定Query对象的查询
/**
*/
public interface QuerySegmentWalker
{
/**
* Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s)
* such that it represents the interval.
*
* @param <T> query result type
* @param query the query to find a Queryable for
* @param intervals the intervals to find a Queryable for
* @return a Queryable object that represents the interval
*/
<T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals);
/**
* Gets the Queryable for a given list of SegmentSpecs.
*
* @param <T> the query result type
* @param query the query to return a Queryable for
* @param specs the list of SegmentSpecs to find a Queryable for
* @return the Queryable object with the given SegmentSpecs
*/
<T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs);
}
实现类如下,看起来有与实时任务相关的
实际加了log,也正是这些个,如下
io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker#getQueryRunnerForIntervals
查询log如下
顺着代码走,可以看到如下 io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker#getQueryRunnerForSegments
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentIdentifier,
descriptor.getInterval().getStart(),
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final FireHydrant hydrant)
{
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new NoopQueryRunner<>();
}
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
log.info("hydrant.getAndIncrementSegment():" + hydrant);
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
return new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
descriptor,
objectMapper,
cache,
toolChest,
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return baseRunner;
}
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
)
)
),
toolChest,
sinkSegmentIdentifier,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
有了hydrant
就能构造QueryRunner,当然最后就能查询了
总结
本文介绍了实时任务查询的核心,没有太具体展开,后续TODO
最后
以上就是震动河马为你收集整理的druid.io index_realtime 实时查询的全部内容,希望文章能够帮你解决druid.io index_realtime 实时查询所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复