我是靠谱客的博主 娇气荷花,最近开发中收集的这篇文章主要介绍Elasticsearch GET请求流程源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

首先看一下一个GET请求是什么样的(参考官网):

GET twitter/_doc/0

{
    "_index" : "twitter",
    "_type" : "_doc",
    "_id" : "0",
    "_version" : 1,
    "_seq_no" : 10,
    "_primary_term" : 1,
    "found": true,
    "_source" : {
        "user" : "kimchy",
        "date" : "2009-11-15T14:12:12",
        "likes": 0,
        "message" : "trying out Elasticsearch"
    }
}

分析从这一个请求开始。

  1. 首先Elasticsearch接收到这样一个请求,通过org.elasticsearch.rest.RestController#dispatchRequest()去寻找符合这个条件的Handler,RestGetAction这个Handler是符合这个请求的。在这个Handler定义了符合条件的GET请求的格式。
 public RestGetAction(final Settings settings, final RestController controller) {
        super(settings);
        controller.registerHandler(GET, "/{index}/_doc/{id}", this);
        controller.registerHandler(HEAD, "/{index}/_doc/{id}", this);

        // Deprecated typed endpoints.
        controller.registerHandler(GET, "/{index}/{type}/{id}", this);
        controller.registerHandler(HEAD, "/{index}/{type}/{id}", this);
 }
public class ActionModule extends AbstractModule {
  public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
    ······
    registerHandler.accept(new RestGetAction(settings, restController));  
    ······  
  }  
}    

这个Handler是在节点初始化的时候注册上去的。

  1. 找到这个Handler以后首先执行org.elasticsearch.rest.BaseRestHandler#handleRequest方法,首先会调用org.elasticsearch.rest.action.document.RestGetAction#prepareRequest这方法先处理请求:
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        GetRequest getRequest;
        // 如果有type,则提示
        //构造一个get请求
        if (request.hasParam("type")) {
            deprecationLogger.deprecatedAndMaybeLog("get_with_types", TYPES_DEPRECATION_MESSAGE);
            getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
        } else {
            getRequest = new GetRequest(request.param("index"), request.param("id"));
        }

        // 判断是否refresh routing等
        getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
        getRequest.routing(request.param("routing"));
        getRequest.preference(request.param("preference"));
        getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
        // 不支持指定fields;需要使用stored_fields
        if (request.param("fields") != null) {
            throw new IllegalArgumentException("the parameter [fields] is no longer supported, " +
                "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source");
        }
        // 获取store_field
        final String fieldsParam = request.param("stored_fields");
        if (fieldsParam != null) {
            final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
            if (fields != null) {
                getRequest.storedFields(fields);
            }
        }

        getRequest.version(RestActions.parseVersion(request));
        getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));

        //是否排除一些字段或者包含一些字段
        getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));

        return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
            @Override
            protected RestStatus getStatus(final GetResponse response) {
                return response.isExists() ? OK : NOT_FOUND;
            }
        });
    }

这里返回了一个RestChannelConsumer对象,这个是一个函数式接口,告诉需要具体执行的方法是 client.get()。在之前的handleRequest方法中获取到这个接口对象,然后进行一些参数和请求体的判断逻辑,然后就要执行这个方法了:

public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // prepare the request for execution; has the side effect of touching the request parameters
        // 获取到了这个接口函数
  			final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet<String> unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) {
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }

        if (request.hasContent() && request.isContentConsumed() == false) {
            throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
        }

        usageCount.increment();
        // execute the action
  	    // 去执行里面定义好的方法
        action.accept(channel);
    }
  1. 区体执行的方法是org.elasticsearch.client.node.NodeClient#doExecute -> executeLocally(),再看下这个方法:
public <    Request extends ActionRequest,
                Response extends ActionResponse
            > Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) {
        return transportAction(action).execute(request, listener);
}

首先获取这个请求对应的transportAction,GET对应的Action是GetAction.INSTANCE -> TransportGetAction, 这个也是在节点启动的时候注册好的:

actions.register(GetAction.INSTANCE, TransportGetAction.class);

获取到这个Action在去执行它的execute方法,TransportGetAction继承了TransportSingleShardAction,他的execute方法在父类里面org.elasticsearch.action.support.TransportAction#execute,看下具体执行的内容:

public final void execute(Task task, Request request, ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

        if (task != null && request.getShouldStoreResult()) {
            listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
        }
        //定义了一个批处理类,会将有关的一些插件内容在具体执行方法前执行
        RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
        requestFilterChain.proceed(task, actionName, request, listener);
}

public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
            int i = index.getAndIncrement();
            try {
                if (i < this.action.filters.length) {
                    //  执行插件逻辑
                    this.action.filters[i].apply(task, actionName, request, listener, this);
                } else if (i == this.action.filters.length) {
                    // 执行action的逻辑
                    this.action.doExecute(task, request, listener);
                } else {
                    listener.onFailure(new IllegalStateException("proceed was called too many times"));
                }
            } catch(Exception e) {
                logger.trace("Error during transport action execution.", e);
                listener.onFailure(e);
            }
}

然后就是需要执行Action的doExecute方法了,对与TransportGetAction,它的doExecute在TransportSingleShardAction里面,

// 具体就是创建了一个AsyncSingleAction对象然后执行它的start方法
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        new AsyncSingleAction(request, listener).start();
}

在这里new了一个对象,我们先看下它的构造方法:

private AsyncSingleAction(Request request, ActionListener<Response> listener) {
            ······
            this.shardIt = shards(clusterState, internalRequest);
        }

这里比较重要的就是获取的请求的分片信息,这个方法是一个抽象方法,需要继承这个接口的类去实现:

    /**
     * Returns the candidate shards to execute the operation on or <code>null</code> the execute
     * the operation locally (the node that received the request)
     */
		// 返回要在其上执行操作的候选碎片或<code>null</code>在本地执行操作(接收请求的节点)
    @Nullable
    protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);

官方注释的意思是如果没有找到分片则本地执行返回null,其他节点执行返回具体需要执行的分片信息。这是一个迭代器,方便失败时在下一个分片执行。构建完对象以后就可以去执行start方法了:

public void start() {
            if (shardIt == null) {
                // just execute it on the local node
                final Writeable.Reader<Response> reader = getResponseReader();
                // 没有找到shard将请求发向本地
                transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),
                    new TransportResponseHandler<Response>() {
                    @Override
                    public Response read(StreamInput in) throws IOException {
                        return reader.read(in);
                    }

                    @Override
                    public String executor() {
                        return ThreadPool.Names.SAME;
                    }

                    @Override
                    public void handleResponse(final Response response) {
                        listener.onResponse(response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        listener.onFailure(exp);
                    }
                });
            } else {
                // 向其他节点发送请求
                perform(null);
            }
}

再看下perform()方法,失败了会调用onFailure方法:

private void perform(@Nullable final Exception currentFailure) {
            ······
            // 获取routing   失败了会使用下一个分片去请求结果
            final ShardRouting shardRouting = shardIt.nextOrNull();
            ·····
            // 获取routing所在的节点
            DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
            if (node == null) {
                onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
            } else {
                  ······
                }
                final Writeable.Reader<Response> reader = getResponseReader();
                transportService.sendRequest(node, transportShardAction, internalRequest.request(),
                    new TransportResponseHandler<Response>() {
                       ······
                        @Override
                        public void handleException(TransportException exp) {
                            onFailure(shardRouting, exp);
                        }
                });
            }
}

// 失败会一直调用上面这个方法,直到成功或者所有分片都失败
private void onFailure(ShardRouting shardRouting, Exception e) {
            if (e != null) {
                logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting,
                    internalRequest.request()), e);
            }
            perform(e);
}
  1. 下一步需要执行的就是 transportService.sendRequest(node, transportShardAction ·····),对于transportShardAction这个string,是在TransportGetAction这个类new的时候构造的,看下具体的构造方法:
protected TransportSingleShardAction(String actionName, ThreadPool threadPool, ClusterService clusterService,
                                         TransportService transportService, ActionFilters actionFilters,
                                         IndexNameExpressionResolver indexNameExpressionResolver, 		  
                                         Supplier<Request> request,
                                         String executor) {
        super(actionName, actionFilters, transportService.getTaskManager());
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.transportService = transportService;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
				// 在这里定义了这个string,
        this.transportShardAction = actionName + "[s]";
        this.executor = executor;
        // 注册一个使其它client调用的Handler
        if (!isSubAction()) {
            transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
        }
  			// 注册transportShardAction 对应的Handler,也就是上面我们需要执行的Handler
        transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
    }

transportService.sendRequest这个方法不管是像本地发送请求,还是像其他节点发送请求,最终都会调用根据这个actionName(transportShardAction)注册的Handler下的messageReceived方法:

    // action[s]执行的方法
    private class ShardTransportHandler implements TransportRequestHandler<Request> {

        @Override
        public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
            if (logger.isTraceEnabled()) {
                logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
            }
            asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
        }
    }

最终执行的方法就是继承TransportSingleShardAction类中实现的shardOperation方法:

// 这也是一个抽象类,需要具体执行的Action去实现
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

所以我们到TransportGetAction类中去找GET请求最终要实现的代码(org.elasticsearch.action.get.TransportGetAction#shardOperation):

protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        // 如果
        if (request.refresh() && !request.realtime()) {
            indexShard.refresh("refresh_flag_get");
        }
				
        // 带上分文档信息去获取结果
        GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
                request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
        return new GetResponse(result);
}	
  1. 去获取文档信息的方法最后会执行到org.elasticsearch.index.engine.InternalEngine#get,在这个方法里面执行最终的GET操作:
// 读取文档的具体流程
    public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
            ·······
            refresh("realtime_get", SearcherScope.INTERNAL, true);
            ······
            // no version, get the version from the index, we know that we refresh on flush
            // 调用searcher读取数据
            // 使用Lucene接口获取文档信息
            return getFromSearcher(get, searcherFactory, scope);
        }
    }

在老版本的ES中GET请求是去translog中获取最新的文案信息来保证是最新的数据,在现在的版本中依靠refresh将内存中的数据落到segment中,使得在下面的操作中可以从Lucene中获取到数据。

到这里一个GET请求就拿到数据了,MutiGet请求的执行流程和这个基本类似,跟bulk请求一样将在同一个分片上的请求集合起来方便去执行,具体请求的流程和上面差不多。

思考

  • ES中的实时性是指在GET 或则 MGET中获取到的数据是最新的数据,不包括搜索聚合。
  • 在5.x之后的版本改为refresh实现实时性,导致对系统的写入速度有影响。
  • Update操作是先GET在写,为了保证一致性强制设置了realtime为true,所以update操作可能会refresh以至于生产新的Segment。
  • 当在一个分片上读失败以后,会尝试从其他副本读取。
参考
  • 《Elasticsearch源码解析与优化实战》张超
  • https://www.elastic.co/guide/en/elasticsearch/reference/7.2/docs-get.html

最后

以上就是娇气荷花为你收集整理的Elasticsearch GET请求流程源码分析的全部内容,希望文章能够帮你解决Elasticsearch GET请求流程源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部