概述
我们知道elasticsearch可以通过指定index和doc id来获取某个doc的,也支持mget的方式,发送一次请求,将多个doc id发送过去查询出相应的docs。
这样做可以有效的减少发往ES的请求数,降低ES的负载。
在web应用层通过HystrixCollapser合并单个get请求为mget请求的处理方式,就能大大提升系统的TPS.
以下为代码示例:
//利用hystrix合并请求
@HystrixCollapser(batchMethod = "mGetDataFromEs", scope = GLOBAL, collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "50"),
@HystrixProperty(name = "maxRequestsInBatch", value = "200"),
})
@Cacheable(value = "esRequestCache", cacheManager = "esRequestCacheManager", key = "#doc.index+':'+#doc.type+':'+#doc.id", unless = "#result == null || #result.size() ==0")
public List<RecommendItem> getDataFromEs3(EsDocGetRequest doc) {
return null;
}
@HystrixCommand(fallbackMethod = "mGetDataFromEsFallback")
public List<List<RecommendItem>> mGetDataFromEs(List<EsDocGetRequest> docs) throws IOException {
logger.debug("================mGetDataFromEs==============");
List<List<RecommendItem>> datas = new ArrayList<>();
List<Doc> docList = new ArrayList<>();
for (EsDocGetRequest req : docs) {
docList.add(new Doc(req.getIndex(), req.getType(), req.getId()));
}
long start = System.currentTimeMillis();
JestResult result = jestClient.execute(new MultiGetExt.Builder.ByDoc(docList).build());
logger.info("============mGetDataFromEs cost time:{}", (System.currentTimeMillis() - start));
//
assertTrue(result.getErrorMessage(), result.isSucceeded());
if (result.isSucceeded()) {
JsonArray actualDocs = result.getJsonObject().getAsJsonArray("docs");
for (int i = 0; i < docs.size(); i++) {
boolean hasError = actualDocs.get(i).getAsJsonObject().has("error");
boolean found = !hasError && actualDocs.get(i).getAsJsonObject().getAsJsonPrimitive("found").getAsBoolean();
if (found) {
EsRcmdResult esRcmdResult = gson.fromJson(actualDocs.get(i).getAsJsonObject().get("_source"), EsRcmdResult.class);
datas.add(esRcmdResult.getData());
} else {
datas.add(Collections.emptyList());
}
}
return datas;
} else {
throw new ServiceException(result.getErrorMessage());
}
}
public List<List<RecommendItem>> mGetDataFromEsFallback(List<EsDocGetRequest> docs) throws IOException {
logger.error("============mGetDataFromEs fallback=============");
return Collections.emptyList();
}
我们利用@HystrixCollapser注解的方式来实现合并请求的目的,这个注解是打在单个请求处理方法上的,主要配置了几个属性:
batchMethod = "mGetDataFromEs"
指定合并处理的方法名,这里单个请求方法和批处理方法的定义非常有考究,具体可以看一下这个注解的注释文档,方法只能接受一个参数,如果你需要传递多个参数,那么请将它们封装成一个类参数。然后批处理方法的参数必须是List类型的参数,泛型和单处理方法中的参数类型一致,例如示例代码中那样,单处理方法的参数为EsDocGetRequest,批处理方法的参数为List<EsDocGetRequest>。另外单处理方法的方法体实际是不会执行的,返回null即可。
@HystrixProperty(name = "timerDelayInMilliseconds", value = "50"),
@HystrixProperty(name = "maxRequestsInBatch", value = "200"),
这两个属性表示合并请求的时间窗口为50ms,窗口时间内最多合并200个请求。默认值是10ms,合并请求数不限。这个根据自己实际情况来设定。
timerDelayInMilliseconds建议尽量设置的小一点,如果并发量不大的话,其实也没有必要使用HystrixCollapser来处理
但是一旦用了HystrixCollapser之后,请求的最小响应延迟一定是大于这个时间窗口了,因为接受到的请求都要经过这个窗口时间的聚合,然后再进行处理,在分别返回给具体的调用方的。如果单个请求的响应时间比较大的话,并不会有多少影响,如果单次响应时间很小的话比如几毫秒,用了合并请求特性之后,至少都变成50几毫秒了,反而造成了响应时间变长的负作用。
scope = GLOBAL
合并作用域,默认是REQUEST,就是不会跨越多个请求会话的,只在当前用户请求中合并多次请求为批处理请求。这里改成GLOBAL,就是可以跨越request context,合并不同用户的请求为一次批处理请求。
另外我在单次请求上海使用了@Cachable注解用来缓存请求结果到redis,不是本文的讨论范畴就不展开说了。
最后就是要在批处理方法上加上@HystrixCommand注解,还配置了一个fallback方法进行熔断快速失败响应。
最后最关键的一点就是批处理方法的实现,方法参数接收到的这个list的size可以理解为本次一共聚合了多少个request
List<EsDocGetRequest> docs
那么,批处理方法返回的集合大小,一定要跟请求参数的集合大小一致,不然就会报错:
Caused by: rx.exceptions.OnErrorNotImplementedException: Failed to map all collapsed requests to response. The expected contract has not been respected. Collapser key: 'getDataFromEs3', requests size: '46', response size: '39'
最后
以上就是义气盼望为你收集整理的通过HystrixCollapser合并请求提高应用吞吐量的全部内容,希望文章能够帮你解决通过HystrixCollapser合并请求提高应用吞吐量所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复