概述
首先看一下一个GET请求是什么样的(参考官网):
GET twitter/_doc/0
{
"_index" : "twitter",
"_type" : "_doc",
"_id" : "0",
"_version" : 1,
"_seq_no" : 10,
"_primary_term" : 1,
"found": true,
"_source" : {
"user" : "kimchy",
"date" : "2009-11-15T14:12:12",
"likes": 0,
"message" : "trying out Elasticsearch"
}
}
分析从这一个请求开始。
- 首先Elasticsearch接收到这样一个请求,通过org.elasticsearch.rest.RestController#dispatchRequest()去寻找符合这个条件的Handler,RestGetAction这个Handler是符合这个请求的。在这个Handler定义了符合条件的GET请求的格式。
public RestGetAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(GET, "/{index}/_doc/{id}", this);
controller.registerHandler(HEAD, "/{index}/_doc/{id}", this);
// Deprecated typed endpoints.
controller.registerHandler(GET, "/{index}/{type}/{id}", this);
controller.registerHandler(HEAD, "/{index}/{type}/{id}", this);
}
public class ActionModule extends AbstractModule {
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
······
registerHandler.accept(new RestGetAction(settings, restController));
······
}
}
这个Handler是在节点初始化的时候注册上去的。
- 找到这个Handler以后首先执行org.elasticsearch.rest.BaseRestHandler#handleRequest方法,首先会调用org.elasticsearch.rest.action.document.RestGetAction#prepareRequest这方法先处理请求:
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
GetRequest getRequest;
// 如果有type,则提示
//构造一个get请求
if (request.hasParam("type")) {
deprecationLogger.deprecatedAndMaybeLog("get_with_types", TYPES_DEPRECATION_MESSAGE);
getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
} else {
getRequest = new GetRequest(request.param("index"), request.param("id"));
}
// 判断是否refresh routing等
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
// 不支持指定fields;需要使用stored_fields
if (request.param("fields") != null) {
throw new IllegalArgumentException("the parameter [fields] is no longer supported, " +
"please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source");
}
// 获取store_field
final String fieldsParam = request.param("stored_fields");
if (fieldsParam != null) {
final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
if (fields != null) {
getRequest.storedFields(fields);
}
}
getRequest.version(RestActions.parseVersion(request));
getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));
//是否排除一些字段或者包含一些字段
getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));
return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
}
这里返回了一个RestChannelConsumer对象,这个是一个函数式接口,告诉需要具体执行的方法是 client.get()。在之前的handleRequest方法中获取到这个接口对象,然后进行一些参数和请求体的判断逻辑,然后就要执行这个方法了:
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
// 获取到了这个接口函数
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
if (request.hasContent() && request.isContentConsumed() == false) {
throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
}
usageCount.increment();
// execute the action
// 去执行里面定义好的方法
action.accept(channel);
}
- 区体执行的方法是org.elasticsearch.client.node.NodeClient#doExecute -> executeLocally(),再看下这个方法:
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) {
return transportAction(action).execute(request, listener);
}
首先获取这个请求对应的transportAction,GET对应的Action是GetAction.INSTANCE -> TransportGetAction, 这个也是在节点启动的时候注册好的:
actions.register(GetAction.INSTANCE, TransportGetAction.class);
获取到这个Action在去执行它的execute方法,TransportGetAction继承了TransportSingleShardAction,他的execute方法在父类里面org.elasticsearch.action.support.TransportAction#execute,看下具体执行的内容:
public final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (task != null && request.getShouldStoreResult()) {
listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
}
//定义了一个批处理类,会将有关的一些插件内容在具体执行方法前执行
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(task, actionName, request, listener);
}
public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
int i = index.getAndIncrement();
try {
if (i < this.action.filters.length) {
// 执行插件逻辑
this.action.filters[i].apply(task, actionName, request, listener, this);
} else if (i == this.action.filters.length) {
// 执行action的逻辑
this.action.doExecute(task, request, listener);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
然后就是需要执行Action的doExecute方法了,对与TransportGetAction,它的doExecute在TransportSingleShardAction里面,
// 具体就是创建了一个AsyncSingleAction对象然后执行它的start方法
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
在这里new了一个对象,我们先看下它的构造方法:
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
······
this.shardIt = shards(clusterState, internalRequest);
}
这里比较重要的就是获取的请求的分片信息,这个方法是一个抽象方法,需要继承这个接口的类去实现:
/**
* Returns the candidate shards to execute the operation on or <code>null</code> the execute
* the operation locally (the node that received the request)
*/
// 返回要在其上执行操作的候选碎片或<code>null</code>在本地执行操作(接收请求的节点)
@Nullable
protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);
官方注释的意思是如果没有找到分片则本地执行返回null,其他节点执行返回具体需要执行的分片信息。这是一个迭代器,方便失败时在下一个分片执行。构建完对象以后就可以去执行start方法了:
public void start() {
if (shardIt == null) {
// just execute it on the local node
final Writeable.Reader<Response> reader = getResponseReader();
// 没有找到shard将请求发向本地
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
} else {
// 向其他节点发送请求
perform(null);
}
}
再看下perform()方法,失败了会调用onFailure方法:
private void perform(@Nullable final Exception currentFailure) {
······
// 获取routing 失败了会使用下一个分片去请求结果
final ShardRouting shardRouting = shardIt.nextOrNull();
·····
// 获取routing所在的节点
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
······
}
final Writeable.Reader<Response> reader = getResponseReader();
transportService.sendRequest(node, transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
······
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
// 失败会一直调用上面这个方法,直到成功或者所有分片都失败
private void onFailure(ShardRouting shardRouting, Exception e) {
if (e != null) {
logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting,
internalRequest.request()), e);
}
perform(e);
}
- 下一步需要执行的就是 transportService.sendRequest(node, transportShardAction ·····),对于transportShardAction这个string,是在TransportGetAction这个类new的时候构造的,看下具体的构造方法:
protected TransportSingleShardAction(String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request,
String executor) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
// 在这里定义了这个string,
this.transportShardAction = actionName + "[s]";
this.executor = executor;
// 注册一个使其它client调用的Handler
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
// 注册transportShardAction 对应的Handler,也就是上面我们需要执行的Handler
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
}
transportService.sendRequest这个方法不管是像本地发送请求,还是像其他节点发送请求,最终都会调用根据这个actionName(transportShardAction)注册的Handler下的messageReceived方法:
// action[s]执行的方法
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
}
}
最终执行的方法就是继承TransportSingleShardAction类中实现的shardOperation方法:
// 这也是一个抽象类,需要具体执行的Action去实现
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
所以我们到TransportGetAction类中去找GET请求最终要实现的代码(org.elasticsearch.action.get.TransportGetAction#shardOperation):
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// 如果
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
// 带上分文档信息去获取结果
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}
- 去获取文档信息的方法最后会执行到org.elasticsearch.index.engine.InternalEngine#get,在这个方法里面执行最终的GET操作:
// 读取文档的具体流程
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
·······
refresh("realtime_get", SearcherScope.INTERNAL, true);
······
// no version, get the version from the index, we know that we refresh on flush
// 调用searcher读取数据
// 使用Lucene接口获取文档信息
return getFromSearcher(get, searcherFactory, scope);
}
}
在老版本的ES中GET请求是去translog中获取最新的文案信息来保证是最新的数据,在现在的版本中依靠refresh将内存中的数据落到segment中,使得在下面的操作中可以从Lucene中获取到数据。
到这里一个GET请求就拿到数据了,MutiGet请求的执行流程和这个基本类似,跟bulk请求一样将在同一个分片上的请求集合起来方便去执行,具体请求的流程和上面差不多。
思考
- ES中的实时性是指在GET 或则 MGET中获取到的数据是最新的数据,不包括搜索聚合。
- 在5.x之后的版本改为refresh实现实时性,导致对系统的写入速度有影响。
- Update操作是先GET在写,为了保证一致性强制设置了realtime为true,所以update操作可能会refresh以至于生产新的Segment。
- 当在一个分片上读失败以后,会尝试从其他副本读取。
参考
- 《Elasticsearch源码解析与优化实战》张超
- https://www.elastic.co/guide/en/elasticsearch/reference/7.2/docs-get.html
最后
以上就是娇气荷花为你收集整理的Elasticsearch GET请求流程源码分析的全部内容,希望文章能够帮你解决Elasticsearch GET请求流程源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复