概述
最近在项目中有看到一种比较实用的ElasticSearch工具类封装方式,特此记录便于日后查阅。
1、controller层
@RequestMapping(value = "/uri/page", method = RequestMethod.GET)
public DataResult page(
@RequestParam(name = "pageIndex") Integer pageIndex,
@RequestParam(name = "pageSize") Integer pageSize,
@RequestParam(name = "uri") String uri,
@RequestParam(name = "api") String api,
@RequestParam(name = "method") String method) {
try {
List<Filter> filters = new ArrayList();
if(StringUtils.isNotBlank(uri)){
filters.add(new Filter("uri", FilterEnum.INCLUDE_NO_SPLIT.getType(), uri));
}
if(StringUtils.isNotBlank(api)){
filters.add(new Filter("api", FilterEnum.EQUAL.getType(), api.toLowerCase()));
}
if(StringUtils.isNotBlank(method)){
filters.add(new Filter("method", FilterEnum.EQUAL.getType(), method.toLowerCase()));
}
return new DataResult(SystemStatusCode.NORMAL.getValue(), "ok", topStatisticsService.findTopUriByCondition(filters, pageIndex, pageSize));
} catch (Exception e) {
LOGGER.error("获取uri top分页数据失败", e);
return new DataResult(SystemStatusCode.ERROR.getValue(), "fail", e.getMessage());
}
}
2、FilterEnum
package com.fenqile.sgp.api.common;
/**
* @author sherrycao
* @version 2019/3/12
*/
public enum FilterEnum {
INCLUDE(1, "包含"),
EXCLUDE(2, "不包含"),
EQUAL(3, "等于"),
UNEQUAL(4, "不等于"),
INCLUDE_NO_SPLIT(1, "包含,但不分词"),
EQUAL_NO_SPLIT(5, "等于,但不分词");
private int type;
private String desc;
FilterEnum(int type, String desc) {
this.type = type;
this.desc = desc;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
3、Service层
public PageInfo<TreeComplexVo> findTopUriByCondition(List<Filter> filters, int pageIndex, int pageSize) throws Exception {
PageInfo<TreeComplexVo> pageInfo = new PageInfo<>();
// 获取所有index
List<String> indices = Arrays.asList(EsConstants.USER_TOP_URI_INDEX_PREFIX);
for (String index : indices) {
elasticSearchDao.createIndex(EsClient.getClient(), index);
}
SearchSourceBuilder searchSourceBuilder = structSearchSourceBuilder(filters, pageIndex, pageSize);
searchSourceBuilder.sort("complexScore", SortOrder.DESC);
List<TreeComplexVo> treeComplexVo = new ArrayList();
String tableName = topProviderService.getEsTableName(EsConstants.TOP_URI, topProviderService.getEsTableIndex(EsConstants.TOP_URI));
SearchResult searchResult = elasticSearchDao.search(EsClient.getClient(), indices, tableName, searchSourceBuilder.toString());
List<SearchResult.Hit<JSONObject, Void>> hits = searchResult.getHits(JSONObject.class);
for (SearchResult.Hit<JSONObject, Void> hit : hits) {
treeComplexVo.add(JSON.parseObject(hit.source.toString(), TreeComplexVo.class));
}
Long total = searchResult.getTotal();
pageInfo.setTotal(total);
pageInfo.setPageNum(pageIndex);
pageInfo.setPageSize(pageSize);
pageInfo.setList(treeComplexVo);
Long num = total % pageSize;
int page;
if (num == 0) {
page = (int) (total / pageSize);
} else {
page = (int) (total / pageSize) + 1;
}
pageInfo.setPages(page);
// 生成query
return pageInfo;
}
@Override
public Boolean loadServiceCount( int pageSize, Map<String, Integer> appMethodCountMap) {
//计算新的表索引,这样轮流覆盖,暂时做0,1轮训
Integer newIndex = getEsTableIndex(EsConstants.TOP_SERVICE)+1;
String tableName = getEsTableName(EsConstants.TOP_SERVICE, newIndex);
try{
//清理旧数据
elasticSearchDao.deleteDocAll(EsClient.getClient(), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);
}catch (Exception e){
LOGGER.error("清理es数据失败", e);
}
Map<String, ServiceCountVo> maps = new HashMap();
for(EnvironmentType environmentType: EnvironmentType.values()){
try{
//切换当前的zk
ServiceService serviceService = (ServiceService)GovernanceConfig.getEnvServiceMap().get(environmentType.getType()).get(SERVICE_PROVIDER_NAME);
int pageIndex = 0;
int resultSize = 0;
do {
//查询分页数据
pageIndex +=1;
PageInfo pageInfo = serviceService.getServiceCount(pageIndex, pageSize);
resultSize = appendSingleZkServiceCount(maps, environmentType, pageInfo, appMethodCountMap);
//分页数据刚好是指定分页大小就继续查询
}while(resultSize==pageSize);
}catch(Exception e){
LOGGER.error("分页获取zk数据失败", e);
}
}
/**
* 分页插入数据到es
*/
List results = new ArrayList(maps.values());
int batchSize = results.size()%ES_STORE_BATCH_SIZE==0?results.size()/ES_STORE_BATCH_SIZE:1+results.size()/ES_STORE_BATCH_SIZE;
for(int i=1;i<=batchSize;i++){
int start = Math.max(0, i-1)*ES_STORE_BATCH_SIZE;
int end = Math.min(results.size(), start+ES_STORE_BATCH_SIZE);
elasticSearchDao.insertDocuments(EsClient.getClient(), results.subList(start, end), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);
}
//成功后更新表索引,方便查询使用
setEsTableIndex(EsConstants.TOP_SERVICE, newIndex);
return true;
}
4、EsClient层
package com.fenqile.sgp.web.config;
import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import com.fenqile.sgp.business.helper.HippoHelper;
/**
* @author sherrycao
* @version 2019/3/6
*/
public class EsClient {
private static JestClient client;
private static JestClient accessLogClient;
private static JestClient businessLogClient;
private EsClient() {
}
private static void build() {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(HippoHelper.getEsUrls())
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(10000)
.readTimeout(10000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
client = factory.getObject();
}
private static void buildAccessLogClient() {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(HippoHelper.getEsAccessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D")
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(20000)
.readTimeout(20000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
accessLogClient = factory.getObject();
}
private static void buildBusinessLogClient() {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(HippoHelper.getEsBusinessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D")
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(20000)
.readTimeout(20000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
businessLogClient = factory.getObject();
}
public static synchronized JestClient getClient() {
if (client == null) {
build();
}
return client;
}
public static synchronized JestClient getAccessLogClient() {
if (accessLogClient == null) {
buildAccessLogClient();
}
return accessLogClient;
}
public static synchronized JestClient getBusinessLogClient() {
if (businessLogClient == null) {
buildBusinessLogClient();
}
return businessLogClient;
}
}
5、ElasticSearchDao层
package com.fenqile.sgp.business.dao;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* @author sherrycao
* @version 2019/3/11
*/
@Service
public class ElasticSearchDao {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchDao.class);
/**
* 插入文档
* @param document
* @param indexName
* @param typeName
*/
public void insertDocument(JestClient client, Object document, String indexName, String typeName) {
try {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
Index index = new Index.Builder(document).build();
bulk.addAction(index);
BulkResult bulkResult = client.execute(bulk.build());
bulkResult.isSucceeded();
} catch (IOException e) {
LOGGER.error("写入es异常, indexName {}, typeName {}", indexName, typeName);
LOGGER.error("", e);
}
}
/**
* 批量插入文档
* @param documents
* @param indexName
* @param typeName
*/
public void insertDocuments(JestClient client, List<Object> documents, String indexName, String typeName) {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
for (Object document : documents) {
Index index = new Index.Builder(document).build();
bulk.addAction(index);
}
try {
BulkResult bulkResult = client.execute(bulk.build());
bulkResult.isSucceeded();
} catch (IOException e) {
LOGGER.error("批量写入es异常, indexName {}, typeName {}", indexName, typeName);
LOGGER.error("", e);
}
}
/**
* 指定id
* @param client
* @param documentMap
* @param indexName
* @param typeName
*/
public void insertDocuments(JestClient client, Map<String, Object> documentMap, String indexName, String typeName) {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
Iterator documentEntry = documentMap.entrySet().iterator();
while(documentEntry.hasNext()){
Map.Entry<String, Object> entry = (Map.Entry)documentEntry.next();
Index index = new Index.Builder(entry.getValue()).id(entry.getKey()).build();
bulk.addAction(index);
}
try {
BulkResult bulkResult = client.execute(bulk.build());
bulkResult.isSucceeded();
} catch (IOException e) {
LOGGER.error("批量写入es异常, indexName {}, typeName {}", indexName, typeName);
LOGGER.error("", e);
}
}
public SearchResult search(JestClient client, String indexName, String typeName, String query) throws Exception {
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(typeName)
.build();
return client.execute(search);
}
/**
* 使用type查询
* @param client
* @param indexName
* @param typeName
* @param query
* @return
* @throws Exception
*/
public SearchResult search(JestClient client, List<String> indexName, String typeName, String query) throws Exception {
LOGGER.info(query);
Search search = new Search.Builder(query)
.addIndices(indexName)
.addType(typeName)
.build();
LOGGER.info(search.toString());
LOGGER.info(search.getPathToResult());
return client.execute(search);
}
/**
* 不使用type查询
* @param client
* @param indexName
* @param query
* @return
* @throws Exception
*/
public SearchResult search(JestClient client, List<String> indexName, String query) throws Exception {
LOGGER.info(query);
Search search = new Search.Builder(query)
.addIndices(indexName)
.build();
LOGGER.info(search.toString());
LOGGER.info(search.getPathToResult());
return client.execute(search);
}
public Boolean createIndex(JestClient client, String indexName) {
try {
JestResult jr = client.execute(new CreateIndex.Builder(indexName).build());
return jr.isSucceeded();
} catch (IOException e) {
LOGGER.error("", e);
}
return false;
}
public boolean deleteDoc(JestClient client,String indexId, String indexName, String indexType) {
Delete.Builder builder = new Delete.Builder(indexId);
builder.refresh(true);
Delete delete = builder.index(indexName).type(indexType).build();
try {
JestResult result = client.execute(delete);
if (result != null && !result.isSucceeded()) {
throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");
}
} catch (Exception e) {
LOGGER.error("",e);
return false;
}
return true;
}
public boolean deleteDocAll(JestClient client,String indexName, String indexType) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
searchSourceBuilder.query(boolQueryBuilder);
DeleteByQuery.Builder builder = new DeleteByQuery.Builder(searchSourceBuilder.toString());
builder.refresh(true);
DeleteByQuery deleteByQuery = builder.addIndex(indexName).addType(indexType).build();
try {
JestResult result = client.execute(deleteByQuery);
if (result != null && !result.isSucceeded()) {
throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");
}
} catch (Exception e) {
LOGGER.error("",e);
return false;
}
return true;
}
/**
* 删除类型
* @param indexName
* @param indexType
*/
public boolean deleteType(JestClient client, String indexName, String indexType) {
DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).type(indexType).build();
try {
JestResult result = client.execute(deleteIndex);
if (result != null && result.isSucceeded()) {
throw new RuntimeException(result.getErrorMessage()+"删除类型失败!");
}
} catch (Exception e) {
LOGGER.error("",e);
return false;
}
return true;
}
/**
* 删除索引
* @param indexName
*/
public boolean deleteIndex(JestClient client, String indexName) {
DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).build();
try {
JestResult result = client.execute(deleteIndex);
if (result != null && result.isSucceeded()) {
throw new RuntimeException(result.getErrorMessage()+"删除索引失败!");
}
} catch (Exception e) {
LOGGER.error("",e);
return false;
}
return true;
}
/**
* 插入或更新文档
* @param id
* @param indexObject
* @param indexName
* @param indexType
* @return
*/
public boolean insertOrUpdateDoc(JestClient client,String id, Object indexObject, String indexName, String indexType) {
Index.Builder builder = new Index.Builder(indexObject);
builder.id(id);
builder.refresh(true);
Index index = builder.index(indexName).type(indexType).build();
try {
JestResult result = client.execute(index);
if (result != null && !result.isSucceeded()) {
throw new RuntimeException(result.getErrorMessage()+"插入更新索引失败!");
}
} catch (Exception e) {
LOGGER.error("",e);
return false;
}
return true;
}
}
到此 ElasticSearch工具类封装介绍完成。
最后
以上就是现代秀发为你收集整理的ElasticSearch工具类封装的全部内容,希望文章能够帮你解决ElasticSearch工具类封装所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复