当前位置:   article > 正文

Java引入Elasticsearch实现增删改查_resthighlevelclient设置index.mapping.total_fields.li

resthighlevelclient设置index.mapping.total_fields.limit

引入Elasticsearch的jar

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  4. <version>7.6.2</version>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>org.elasticsearch</groupId>
  8. <artifactId>elasticsearch</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.elasticsearch</groupId>
  14. <artifactId>elasticsearch</artifactId>
  15. <version>7.6.2</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.elasticsearch.plugin</groupId>
  19. <artifactId>delete-by-query</artifactId>
  20. <version>2.4.6</version>
  21. </dependency>

编写Elasticsearch基类

Es的基本信息的获取和封装

  1. package *.*.common.es.common;
  2. import java.util.ArrayList;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.StringJoiner;
  6. import org.apache.http.HttpHost;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import *.*.common.config.ConfigGroupFile;
  10. import *.*.common.config.ConfigService;
  11. public class EsConfig {
  12. @SuppressWarnings("unused")
  13. private static final Logger logger = LoggerFactory.getLogger(EsConfig.class);
  14. private static EsConfig config;
  15. /**
  16. * 节点列表
  17. */
  18. private HttpHost[] hosts;
  19. /**
  20. * 用户名
  21. */
  22. private String username;
  23. /**
  24. * 密码
  25. */
  26. private String password;
  27. // 调优参数
  28. /**
  29. * 连接超时
  30. */
  31. private int connTimeout;
  32. /**
  33. * 读写超时
  34. */
  35. private int socketTimeout;
  36. /**
  37. * 请求超时
  38. */
  39. private int connRequestTimeout;
  40. /**
  41. * 允许建立多少个连接,超出以后所有的连接都会进入等待队列
  42. */
  43. private int maxConnTotal;
  44. /**
  45. * 同一个 host:port 最多有几个活跃连接
  46. */
  47. private int maxConnPerRoute;
  48. /**
  49. * 返回 ES 配置信息。
  50. *
  51. * @return ES配置
  52. */
  53. public static EsConfig getConfig() {
  54. if (config == null) {
  55. config = doCreate();
  56. }
  57. return config;
  58. }
  59. /*
  60. * 创建配置实例。
  61. */
  62. private static EsConfig doCreate() {
  63. EsConfig config = new EsConfig();
  64. config.hosts = buildHosts();
  65. config.username = ConfigService.getPersionString("elasticsearch.http.username", ConfigGroupFile.getEsConfig());
  66. config.password = ConfigService.getPersionString("elasticsearch.http.password", ConfigGroupFile.getEsConfig());
  67. // 超时参数
  68. config.connTimeout = ConfigService.getInt("crab.elasticsearch.http.conn_timeout", 5000);
  69. config.socketTimeout = ConfigService.getInt("crab.elasticsearch.http.socket_timeout", 30000);
  70. config.connRequestTimeout = ConfigService.getInt("crab.elasticsearch.http.conn_request_timeout", 5000);
  71. // 连接数
  72. config.maxConnTotal = ConfigService.getInt("crab.elasticsearch.http.max_conn_total", 400);
  73. config.maxConnPerRoute = ConfigService.getInt("crab.elasticsearch.http.max_conn_per_route", 200);
  74. return config;
  75. }
  76. /*
  77. * 装装主机信息。
  78. */
  79. private static HttpHost[] buildHosts() {
  80. String hosts = ConfigService.getPersionString("elasticsearch.http.hosts", ConfigGroupFile.getEsConfig());
  81. if (hosts == null) {
  82. return new HttpHost[0];
  83. }
  84. List<HttpHost> hostList = new ArrayList<>();
  85. for (String host : hosts.split(",")) {
  86. String[] hostPort = host.split(":");
  87. if (hostPort.length == 1) {
  88. hostList.add(new HttpHost(hostPort[0])); // 10.1.1.1
  89. } else if (hostPort.length == 2) {
  90. hostList.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1]))); // 10.1.1.1:9200
  91. } else if (hostPort.length == 3) {
  92. hostList.add(new HttpHost(hostPort[1], Integer.parseInt(hostPort[2]), hostPort[0])); // https:10.1.1.1:9200
  93. }
  94. }
  95. return hostList.toArray(new HttpHost[hostList.size()]);
  96. }
  97. /**
  98. * 获取节点列表。
  99. *
  100. * @return 节点名
  101. */
  102. public static HttpHost[] getHosts() {
  103. return getConfig().hosts;
  104. }
  105. /**
  106. * 获取用户名
  107. *
  108. * @return 用户名
  109. */
  110. public static String getUsername() {
  111. return getConfig().username;
  112. }
  113. /**
  114. * 获取密码。
  115. *
  116. * @return 密码
  117. */
  118. public static String getPassword() {
  119. return getConfig().password;
  120. }
  121. public static int getConnTimeout() {
  122. return getConfig().connTimeout;
  123. }
  124. public static int getSocketTimeout() {
  125. return getConfig().socketTimeout;
  126. }
  127. public static int getConnRequestTimeout() {
  128. return getConfig().connRequestTimeout;
  129. }
  130. public static int getMaxConnTotal() {
  131. return getConfig().maxConnTotal;
  132. }
  133. public static int getMaxConnPerRoute() {
  134. return getConfig().maxConnPerRoute;
  135. }
  136. @Override
  137. public String toString() {
  138. StringJoiner sj = new StringJoiner(", ", "{", "}");
  139. sj.add("hosts:" + Arrays.toString(hosts));
  140. sj.add("username:" + username);
  141. sj.add("connTimeout:" + connTimeout);
  142. sj.add("socketTimeout:" + socketTimeout);
  143. sj.add("connRequestTimeout:" + connRequestTimeout);
  144. sj.add("maxConnTotal:" + maxConnTotal);
  145. sj.add("maxConnPerRoute:" + maxConnPerRoute);
  146. return sj.toString();
  147. }
  148. }
  1. package *.*.common.es.common;
  2. import java.io.IOException;
  3. import org.apache.http.auth.AuthScope;
  4. import org.apache.http.auth.UsernamePasswordCredentials;
  5. import org.apache.http.impl.client.BasicCredentialsProvider;
  6. import org.elasticsearch.client.Node;
  7. import org.elasticsearch.client.RestClient;
  8. import org.elasticsearch.client.RestClientBuilder;
  9. import org.elasticsearch.client.RestHighLevelClient;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import *.*.api.error.ServerException;
  13. public class EsClients {
  14. private static final Logger logger = LoggerFactory.getLogger(EsClients.class);
  15. private RestHighLevelClient client;
  16. // 单例
  17. private static class Singleton {
  18. public static final EsClients INSTANCE = new EsClients();
  19. private Singleton() {}
  20. }
  21. private EsClients() {
  22. client = new RestHighLevelClient(createClientBuilder());
  23. }
  24. public void reBuildEsClients() {
  25. if (client != null) {
  26. try {
  27. client.close();
  28. } catch (Exception e) {
  29. logger.debug(e.getMessage(), e);
  30. }
  31. }
  32. client = new RestHighLevelClient(createClientBuilder());
  33. }
  34. public static RestHighLevelClient getClient(){
  35. return Singleton.INSTANCE.client;
  36. }
  37. /**
  38. * ES 执行操作请求的回调类。
  39. */
  40. @FunctionalInterface
  41. public interface ClientRequest<T> {
  42. T doRequest(RestHighLevelClient client) throws IOException;
  43. }
  44. /**
  45. * 执行 ES 的操作请求。
  46. *
  47. * @param request 操作的请求
  48. * @return 执行结果
  49. */
  50. public static <T> T executeRequest(ClientRequest<T> request) {
  51. try {
  52. return request.doRequest(Singleton.INSTANCE.client);
  53. } catch (Exception e) {
  54. logger.error(e.getMessage());
  55. String message = e.getMessage();
  56. if (message != null && message.contains("I/O reactor status: STOPPED")) {
  57. return reQuery(request);
  58. }
  59. throw new ServerException("", e);
  60. }
  61. }
  62. private static <T> T reQuery(ClientRequest<T> request) {
  63. logger.warn("we encounter a problem I/O reactor status: STOPPED");
  64. Singleton.INSTANCE.reBuildEsClients();
  65. try {
  66. return request.doRequest(Singleton.INSTANCE.client);
  67. } catch (Exception e) {
  68. throw new ServerException("new query", e);
  69. }
  70. }
  71. /*
  72. * 构造 builder 。
  73. */
  74. private static RestClientBuilder createClientBuilder() {
  75. RestClientBuilder builder = RestClient.builder(EsConfig.getHosts());
  76. // 连接超时时间
  77. builder.setRequestConfigCallback(requestConfigBuilder -> {
  78. requestConfigBuilder.setConnectTimeout(EsConfig.getConnTimeout());
  79. requestConfigBuilder.setSocketTimeout(EsConfig.getSocketTimeout());
  80. requestConfigBuilder.setConnectionRequestTimeout(EsConfig.getConnRequestTimeout());
  81. return requestConfigBuilder;
  82. });
  83. // 连接信息
  84. builder.setHttpClientConfigCallback(httpClientBuilder -> {
  85. httpClientBuilder.setMaxConnTotal(EsConfig.getMaxConnTotal());
  86. httpClientBuilder.setMaxConnPerRoute(EsConfig.getMaxConnPerRoute());
  87. final BasicCredentialsProvider provider = new BasicCredentialsProvider();
  88. provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(EsConfig.getUsername(), EsConfig.getPassword()));
  89. return httpClientBuilder.setDefaultCredentialsProvider(provider);
  90. });
  91. builder.setFailureListener(new RestClient.FailureListener() {
  92. @Override
  93. public void onFailure(Node node) {
  94. logger.debug("failed to execute on node: {}", node);
  95. }
  96. });
  97. return builder;
  98. }
  99. }

ES索引的Setting统一设置 ,包括分片信息,数据集,map的字段个数,是否开启大消息忽略

  1. package *.*.common.es.entity;
  2. import *.*.common.config.ConfigService;
  3. /**
  4. *
  5. * @author zhanglb
  6. * Create at 2022年2月17日 下午5:37:26
  7. */
  8. public class EsSettingEntity {
  9. /**
  10. * 分片信息
  11. */
  12. private int indexShard = 3;
  13. /**
  14. * 数据集
  15. */
  16. private int indexReplicas = 2;
  17. /**
  18. * map的字段个数
  19. */
  20. private int fieldLength = 5000;//默认5000
  21. /**
  22. * 是否开启大小写忽略
  23. */
  24. private boolean openLowercase = ConfigService.getInstance().isOpenLowercase();
  25. public EsSettingEntity() {
  26. }
  27. public EsSettingEntity(int indexShard, int indexReplicas, boolean openLowercase) {
  28. this.indexShard = indexShard;
  29. this.indexReplicas = indexReplicas;
  30. this.openLowercase = openLowercase;
  31. }
  32. /**
  33. * @return indexShard
  34. */
  35. public int getIndexShard() {
  36. return indexShard;
  37. }
  38. /**
  39. * @param indexShard
  40. */
  41. public void setIndexShard(int indexShard) {
  42. this.indexShard = indexShard;
  43. }
  44. /**
  45. * @return indexReplicas
  46. */
  47. public int getIndexReplicas() {
  48. return indexReplicas;
  49. }
  50. /**
  51. * @param indexReplicas
  52. */
  53. public void setIndexReplicas(int indexReplicas) {
  54. this.indexReplicas = indexReplicas;
  55. }
  56. /**
  57. * @return fieldLength
  58. */
  59. public int getFieldLength() {
  60. return fieldLength;
  61. }
  62. /**
  63. * @param fieldLength
  64. */
  65. public void setFieldLength(int fieldLength) {
  66. this.fieldLength = fieldLength;
  67. }
  68. /**
  69. * @return openLowercase
  70. */
  71. public boolean isOpenLowercase() {
  72. return openLowercase;
  73. }
  74. /**
  75. * @param openLowercase
  76. */
  77. public void setOpenLowercase(boolean openLowercase) {
  78. this.openLowercase = openLowercase;
  79. }
  80. }

ES Setting的初始化公共类 

  1. package *.*.common.es.support;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.elasticsearch.common.settings.Settings;
  5. import *.*.common.es.entity.EsSettingEntity;
  6. /**
  7. *
  8. * @author zhanglb
  9. * Create at 2022年2月17日 下午5:14:23
  10. */
  11. public class EsSetting {
  12. public EsSetting() {
  13. }
  14. /**
  15. * 初始化setting
  16. * @param entity
  17. * @return
  18. */
  19. public static Settings.Builder initSetting(EsSettingEntity entity) {
  20. Settings.Builder setting = Settings.builder().put("index.number_of_shards", entity.getIndexShard()).put("index.number_of_replicas", entity.getIndexReplicas())
  21. .put("index.mapping.total_fields.limit", entity.getFieldLength());
  22. if (entity.isOpenLowercase()) {
  23. setLowercase(setting);
  24. }
  25. return setting;
  26. }
  27. /**
  28. * 设置大小写不敏感
  29. * @param setting
  30. * @return
  31. */
  32. public static Settings.Builder setLowercase(Settings.Builder setting) {
  33. List<String> filter = new ArrayList<>();
  34. filter.add("lowercase");
  35. setting.put("index.analysis.normalizer.lowercase.type",EsActivityMapping.FIELD_TYPE_CUSTOM).putList("index.analysis.normalizer.lowercase.filter", filter);
  36. return setting;
  37. }
  38. }

ES查询 数据的基础类

  1. package *.*.common.es.support;
  2. import java.util.List;
  3. import java.util.concurrent.TimeUnit;
  4. import org.elasticsearch.ElasticsearchException;
  5. import org.elasticsearch.action.search.SearchRequest;
  6. import org.elasticsearch.action.search.SearchResponse;
  7. import org.elasticsearch.client.RequestOptions;
  8. import org.elasticsearch.client.core.CountRequest;
  9. import org.elasticsearch.common.unit.TimeValue;
  10. import org.elasticsearch.search.builder.SearchSourceBuilder;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import *.*.api.error.ClientException;
  14. import *.*.api.error.ServerException;
  15. import *.*.common.config.ConfigService;
  16. import *.*.common.es.common.EsClients;
  17. import *.*.i18n.source.error.I18nErrorMessages;
  18. /**
  19. *
  20. * 从Es中查询数据工具类
  21. *
  22. * @author zhanglb
  23. *
  24. */
  25. public class EsDataReader {
  26. private static final Logger logger = LoggerFactory.getLogger(EsDataReader.class);
  27. private static boolean UNIT_TEST = ConfigService.getBoolean("crab.unit.test.open", false);
  28. public EsDataReader() {
  29. }
  30. /**
  31. * 执行 count 请求。
  32. *
  33. * @param queryParams 过滤条件
  34. * @return 累计数
  35. */
  36. public static long count(List<String> index, SearchSourceBuilder sourceBuilder) {
  37. if (UNIT_TEST) {
  38. logger.debug("es count has been suppressed for unit test.");
  39. return 0l;
  40. }
  41. try {
  42. CountRequest countReq = new CountRequest(index.toArray(new String[index.size()]), sourceBuilder);
  43. return EsClients.executeRequest(client -> client.count(countReq, RequestOptions.DEFAULT).getCount());
  44. } catch (ElasticsearchException e) {
  45. logger.error("", e);
  46. throw new ServerException("Failed to execute count: " + e.getMessage(), null);
  47. }
  48. }
  49. /**
  50. * 根据指定的查询条件,执行数据操作。
  51. *
  52. * @param queryParams 查询条件对象
  53. * @return 查询结果
  54. */
  55. public static SearchResponse query(List<String> index, SearchSourceBuilder sourceBuilder) {
  56. if (UNIT_TEST) {
  57. logger.debug("es query has been suppressed for unit test.");
  58. return null;
  59. }
  60. SearchResponse response;
  61. SearchRequest request;
  62. try {
  63. // 设置超时时间
  64. sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
  65. request = new SearchRequest(index.toArray(new String[index.size()]), sourceBuilder);
  66. if (logger.isDebugEnabled()) {
  67. logger.debug("query with condition: {}", request.source().query());
  68. }
  69. response = EsClients.executeRequest(client -> client.search(request, RequestOptions.DEFAULT));
  70. } catch (ElasticsearchException ee) {
  71. Throwable cause = ee.getCause();
  72. if (cause != null && cause.getMessage().contains("Result window is too large")) {
  73. logger.debug("", ee);
  74. throw ClientException.create(I18nErrorMessages.ERROR_QUERY_FAIELD_NEED_LIMIT);
  75. } else {
  76. logger.error("", ee);
  77. throw new ServerException("Failed to execute query: " + ee.getMessage(), null);
  78. }
  79. }
  80. if (response.isTimedOut()) {
  81. throw new ServerException("Query with timeout", null);
  82. }
  83. return response;
  84. }
  85. }

ES的写入数据的基础类 

  1. package *.*.common.es.support;
  2. import java.util.List;
  3. import java.util.Map;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.elasticsearch.action.ActionListener;
  6. import org.elasticsearch.action.bulk.BulkItemResponse;
  7. import org.elasticsearch.action.bulk.BulkRequest;
  8. import org.elasticsearch.action.bulk.BulkResponse;
  9. import org.elasticsearch.action.delete.DeleteRequest;
  10. import org.elasticsearch.action.index.IndexRequest;
  11. import org.elasticsearch.action.update.UpdateRequest;
  12. import org.elasticsearch.client.RequestOptions;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import *.*.common.config.ConfigService;
  16. import *.*.common.es.common.EsClients;
  17. import *.*.common.es.common.EsIndexUtils;
  18. import *.*.common.util.ConstantFields;
  19. /**
  20. * 把动态记录写入到es中工具类
  21. *
  22. * @author zhanglb
  23. *
  24. */
  25. public class EsDataWriter {
  26. private static final Logger logger = LoggerFactory.getLogger(EsDataWriter.class);
  27. private static boolean UNIT_TEST = ConfigService.getBoolean("crab.unit.test.open", false);
  28. public EsDataWriter() {
  29. }
  30. public static boolean sync(String index, List<Map<String, Object>> map) {
  31. if (UNIT_TEST) {
  32. logger.debug("es sync has been suppressed for unit test.");
  33. return true;
  34. }
  35. if (StringUtils.isBlank(index) || map == null)
  36. return false;
  37. boolean storeIndex = false;
  38. if (!map.isEmpty()) {
  39. if (!EsIndexUtils.exists(index)) {
  40. logger.info("create index:" + index);
  41. EsIndexUtils.createIndex(index, null, null, EsActivityMapping.buildActivityMapping());
  42. storeIndex = true;
  43. }
  44. EsDataWriter.createObjectsAsync(index, map);
  45. }
  46. return storeIndex;
  47. }
  48. /**
  49. * 写操作类型。
  50. */
  51. public enum WriteAction {
  52. ADD, UPDATE, UPSERT, DELETE;
  53. }
  54. /**
  55. * 异步创建资源对象。
  56. *
  57. * @param objects 待新建的资源对象
  58. */
  59. public static void createObjectsAsync(String index, List<Map<String, Object>> objects) {
  60. if (UNIT_TEST) {
  61. logger.debug("es create object has been suppressed for unit test.");
  62. return;
  63. }
  64. writerObjectsAsync(index, objects, WriteAction.ADD);
  65. }
  66. /**
  67. * 异步更新资源对象。
  68. *
  69. * @param objects 待更新的资源对象(只需提供 ID 以及 变更的字段)
  70. */
  71. public static void updateObjectsAsync(String index, List<Map<String, Object>> objects) {
  72. if (UNIT_TEST) {
  73. logger.debug("es update object has been suppressed for unit test.");
  74. return;
  75. }
  76. writerObjectsAsync(index, objects, WriteAction.UPDATE);
  77. }
  78. /**
  79. * 异步保存(根据 ID 判断新建或更新)资源对象。
  80. *
  81. * @param objects 待保存的资源对象
  82. */
  83. public static void saveObjectsAsync(String index, List<Map<String, Object>> objects) {
  84. if (UNIT_TEST) {
  85. logger.debug("es save object has been suppressed for unit test.");
  86. return;
  87. }
  88. writerObjectsAsync(index, objects, WriteAction.UPSERT);
  89. }
  90. /**
  91. * 异步保存(根据 ID 判断新建或更新)资源对象。
  92. *
  93. * @param objects 待保存的资源对象
  94. */
  95. public static void delObjectsAsync(String index, List<Map<String, Object>> objects) {
  96. if (UNIT_TEST) {
  97. logger.debug("es delete object has been suppressed for unit test.");
  98. return;
  99. }
  100. writerObjectsAsync(index, objects, WriteAction.DELETE);
  101. }
  102. /**
  103. * 以异步的形式批量写资源对象。
  104. *
  105. * @param writeRequests 写请求
  106. */
  107. public static void writerObjectsAsync(String index, List<Map<String, Object>> writeRequests, WriteAction action) {
  108. if (UNIT_TEST) {
  109. logger.debug("es write object has been suppressed for unit test.");
  110. return;
  111. }
  112. if (writeRequests == null || writeRequests.isEmpty())
  113. return;
  114. final BulkRequest request = new BulkRequest();
  115. for (Map<String, Object> writeRequest : writeRequests) {
  116. switch (action) {
  117. case ADD:
  118. request.add(buildCreateRequest(index, writeRequest));
  119. break;
  120. case UPDATE:
  121. UpdateRequest reqUp = buildUpdateRequest(index, writeRequest, true);
  122. if (reqUp == null)
  123. break;
  124. request.add(reqUp);
  125. break;
  126. case UPSERT:
  127. UpdateRequest reqU = buildUpdateRequest(index, writeRequest, true);
  128. if (reqU == null)
  129. break;
  130. request.add(reqU);
  131. break;
  132. case DELETE:
  133. DeleteRequest reqD = buildDeleteRequest(index, writeRequest.get(ConstantFields.FIELD_ID));
  134. if (reqD == null)
  135. break;
  136. request.add(reqD);
  137. break;
  138. }
  139. }
  140. EsClients.executeRequest(client -> {
  141. client.bulkAsync(request, RequestOptions.DEFAULT, new BulkWriteCallback());
  142. return null;
  143. });
  144. }
  145. /*
  146. * 新建资源请求。
  147. *
  148. * @return 请求对象
  149. */
  150. private static IndexRequest buildCreateRequest(String index, Map<String, Object> map) {
  151. IndexRequest create = new IndexRequest(index);
  152. if (map.containsKey(ConstantFields.FIELD_ID)) {
  153. create.id(map.get(ConstantFields.FIELD_ID).toString());
  154. }
  155. create.source(map);
  156. return create;
  157. }
  158. /*
  159. * 更新资源请求。
  160. *
  161. * @param upsert 如果不存在是否新建
  162. *
  163. * @return 请求对象
  164. */
  165. private static UpdateRequest buildUpdateRequest(String index, Map<String, Object> map, boolean upsert) {
  166. if (!map.containsKey(ConstantFields.FIELD_ID)) {
  167. return null;
  168. }
  169. UpdateRequest update = new UpdateRequest(index, map.get(ConstantFields.FIELD_ID).toString()).retryOnConflict(3);
  170. update.docAsUpsert(upsert);
  171. update.doc(map);
  172. return update;
  173. }
  174. /*
  175. * 删除资源请求。
  176. *
  177. * @return 请求对象
  178. */
  179. private static DeleteRequest buildDeleteRequest(String index, Object id) {
  180. if (id == null)
  181. return null;
  182. return new DeleteRequest(index, id.toString());
  183. }
  184. /*
  185. * 回调。
  186. */
  187. private static class BulkWriteCallback implements ActionListener<BulkResponse> {
  188. @Override
  189. public void onFailure(Exception e) {
  190. logger.error("Write data with error", e);
  191. }
  192. @Override
  193. public void onResponse(BulkResponse bulkItemResponses) {
  194. if (bulkItemResponses.hasFailures()) {
  195. processError(bulkItemResponses.getItems());
  196. } else {
  197. logger.debug("Write data successfully, count: {}", bulkItemResponses.getItems().length);
  198. }
  199. }
  200. private void processError(BulkItemResponse[] responses) {
  201. for (BulkItemResponse response : responses) {
  202. if (!response.isFailed() || StringUtils.contains(response.getFailureMessage(), "document_missing_exception")) {
  203. continue;
  204. }
  205. logger.error("Write data with error: {}", response.getFailureMessage());
  206. }
  207. }
  208. }
  209. }

ES创建索引,修改别名等公共方法类 

  1. package *.*.common.es.common;
  2. import java.util.Collections;
  3. import java.util.List;
  4. import java.util.Set;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.elasticsearch.ElasticsearchException;
  7. import org.elasticsearch.action.admin.indices.alias.Alias;
  8. import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
  9. import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
  10. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  11. import org.elasticsearch.action.support.master.AcknowledgedResponse;
  12. import org.elasticsearch.client.GetAliasesResponse;
  13. import org.elasticsearch.client.RequestOptions;
  14. import org.elasticsearch.client.indices.CreateIndexRequest;
  15. import org.elasticsearch.client.indices.CreateIndexResponse;
  16. import org.elasticsearch.client.indices.GetIndexRequest;
  17. import org.elasticsearch.client.indices.PutIndexTemplateRequest;
  18. import org.elasticsearch.common.settings.Settings;
  19. import org.elasticsearch.common.xcontent.XContentBuilder;
  20. import org.elasticsearch.rest.RestStatus;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import *.*.common.config.ConfigService;
  24. import *.*.common.es.entity.EsSettingEntity;
  25. import *.*.common.es.support.EsSetting;
  26. import *.*.common.util.ObjectUtils;
  27. import *.*.api.error.ServerException;
  28. public class EsIndexUtils {
  29. private static final Logger logger = LoggerFactory.getLogger(EsIndexUtils.class);
  30. private static boolean UNIT_TEST = ConfigService.getBoolean("crab.unit.test.open", false);
  31. // 是否删除旧索引
  32. private static boolean deleteOldIndex = ConfigService.getBoolean("crab.es.delete.old.index", true);
  33. private static final String INDEX_PREX = "*_activity_";
  34. private EsIndexUtils() {
  35. }
  36. /**
  37. * 创建指定的 index
  38. *
  39. * @param idxName 索引名
  40. * @param aliasName 索引的别名
  41. * @param mappingBuilder 索引的映射配置
  42. */
  43. public static void createIndex(String idxName, String aliasName, Settings.Builder setting, XContentBuilder mappingBuilder) {
  44. if (UNIT_TEST) {
  45. logger.debug("es create index has been suppressed for unit test.");
  46. return;
  47. }
  48. //当setting为null时,初始化默认的setting
  49. if (setting == null) {
  50. setting = EsSetting.initSetting(new EsSettingEntity());
  51. }
  52. CreateIndexRequest request = new CreateIndexRequest(idxName);
  53. request.settings(setting);
  54. if (aliasName != null) {
  55. request.alias(new Alias(aliasName));
  56. }
  57. if (mappingBuilder != null) {
  58. request.mapping(mappingBuilder);
  59. }
  60. CreateIndexResponse response = EsClients.executeRequest(client -> client.indices().create(request, RequestOptions.DEFAULT));
  61. if (!response.isAcknowledged()) {
  62. logger.error("Failed to create index '{}' with alias: {}", idxName, aliasName);
  63. }
  64. }
  65. /**
  66. * 创建指定的 index
  67. *
  68. * @param idxName 索引名
  69. */
  70. public static void createIndexByTemplate(String idxName) {
  71. CreateIndexRequest request = new CreateIndexRequest(idxName);
  72. CreateIndexResponse response = EsClients.executeRequest(client -> client.indices().create(request,
  73. RequestOptions.DEFAULT));
  74. if (!response.isAcknowledged()) {
  75. logger.error("Failed to create index '{}'", idxName);
  76. }
  77. }
  78. /**
  79. * 迁移 index 的别名到新的。
  80. *
  81. * @param indexAlias 索引别名
  82. * @param oldIndexName 旧索引
  83. * @param newIndexName 新索引
  84. */
  85. public static void aliases(String indexAlias, String oldIndexName, String newIndexName) {
  86. if (UNIT_TEST) {
  87. logger.debug("es aliases name has been suppressed for unit test.");
  88. return;
  89. }
  90. IndicesAliasesRequest request = new IndicesAliasesRequest();
  91. // 先移除,再添加
  92. request.addAliasAction(
  93. new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE).index(oldIndexName).alias(indexAlias));
  94. request.addAliasAction(
  95. new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD).index(newIndexName).alias(indexAlias));
  96. AcknowledgedResponse response = EsClients.executeRequest(client -> client.indices().updateAliases(request, RequestOptions.DEFAULT));
  97. if (!response.isAcknowledged()) {
  98. logger.error("Failed to aliases index: {}", indexAlias);
  99. }
  100. }
  101. /**
  102. * 设置索引别名。
  103. *
  104. * @param indexAlias 索引别名
  105. * @param newIndexName 新索引
  106. * @param dropOldIndeces 被删除的别名所关联的索引
  107. */
  108. public static Set<String> aliases(String indexAlias, String newIndexName, boolean dropOldIndeces) {
  109. Set<String> oldIndices = Collections.emptySet();
  110. if (dropOldIndeces) {
  111. oldIndices = getIndexByAlias(indexAlias);
  112. }
  113. if (!oldIndices.isEmpty()) {
  114. oldIndices.remove(newIndexName); // 保留最新创建的索引
  115. }
  116. IndicesAliasesRequest request = new IndicesAliasesRequest();
  117. // 关联新索引
  118. request.addAliasAction(new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
  119. .index(newIndexName).alias(indexAlias).writeIndex(true)); // 将新增的索引设置为可写
  120. // 删除指定索引
  121. if (!oldIndices.isEmpty()) {
  122. request.addAliasAction(new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE_INDEX)
  123. .indices(oldIndices.toArray(new String[oldIndices.size()])));
  124. }
  125. AcknowledgedResponse response = EsClients.executeRequest(client -> client.indices().updateAliases(request, RequestOptions.DEFAULT));
  126. if (!response.isAcknowledged()) {
  127. logger.error("Failed to aliases index: {}", indexAlias);
  128. return Collections.emptySet();
  129. }
  130. // 删除旧索引
  131. deleteOldIndexes(oldIndices);
  132. return oldIndices;
  133. }
  134. private static void deleteOldIndexes(Set<String> oldIndexes) {
  135. if (ObjectUtils.isNullOrEmpty(oldIndexes) || !deleteOldIndex) {
  136. return;
  137. }
  138. for (String oldIndex : oldIndexes) {
  139. dropIndex(oldIndex, true);
  140. }
  141. }
  142. /**
  143. * 返回指定别名所关联的所有索引。
  144. *
  145. * @param indexAlias 索引别名
  146. * @return 别名所关联的索引列表
  147. */
  148. public static Set<String> getIndexByAlias(String indexAlias) {
  149. GetAliasesRequest getAliasReq = new GetAliasesRequest();
  150. getAliasReq.aliases(indexAlias);
  151. GetAliasesResponse getAliasRsp = EsClients.executeRequest(client -> client.indices().getAlias(getAliasReq, RequestOptions.DEFAULT));
  152. return getAliasRsp.getAliases().keySet();
  153. }
  154. /**
  155. * 判断指定的 index 是否存在。
  156. *
  157. * @param indexName 索引名
  158. * @return 如果存在返回 true,否则返回 false
  159. */
  160. public static boolean exists(String indexName) {
  161. if (UNIT_TEST) {
  162. logger.debug("es check index has been suppressed for unit test.");
  163. return false;
  164. }
  165. GetIndexRequest getRequest = new GetIndexRequest(indexName);
  166. return EsClients.executeRequest(client -> client.indices().exists(getRequest, RequestOptions.DEFAULT));
  167. }
  168. /**
  169. * 删除指定的 index
  170. *
  171. * @param indexName 索引名
  172. */
  173. public static void dropIndex(String indexName, boolean ignoreIfNotExists) {
  174. if (UNIT_TEST) {
  175. logger.debug("es drop index has been suppressed for unit test.");
  176. return;
  177. }
  178. try {
  179. DeleteIndexRequest deleteRequest = new DeleteIndexRequest(indexName);
  180. EsClients.executeRequest(client -> client.indices().delete(deleteRequest, RequestOptions.DEFAULT));
  181. } catch (ElasticsearchException e) {
  182. if (ignoreIfNotExists && e.status() == RestStatus.NOT_FOUND) {
  183. return; // ignore
  184. }
  185. logger.error("Failed to drop index: " + indexName, e);
  186. throw new ServerException("Failed to drop index: " + indexName + ", reason: " + e.getMessage(), null);
  187. }
  188. }
  189. public static String fillIndex(String index) {
  190. if (StringUtils.isBlank(index))
  191. return null;
  192. StringBuffer sb = new StringBuffer(INDEX_PREX);
  193. if (index.startsWith(INDEX_PREX)) {
  194. return index;
  195. }
  196. return sb.append(index).toString();
  197. }
  198. /**
  199. * 创建指定的 index 模板。
  200. *
  201. * @param tplName 索引模板名
  202. * @param idxAliasName 索引的别名
  203. * @param patterns 被应用模板的索引模式
  204. * @param mappingBuilder 模板的索引映射配置
  205. */
  206. public static void createIndexTemplate(String tplName, String idxAliasName, List<String> patterns,
  207. XContentBuilder mappingBuilder) {
  208. PutIndexTemplateRequest request = new PutIndexTemplateRequest(tplName);
  209. request.settings(getIndexSettings(idxAliasName)).patterns(patterns);
  210. if (idxAliasName != null) {
  211. request.alias(new Alias(idxAliasName));
  212. }
  213. if (mappingBuilder != null) {
  214. request.mapping(mappingBuilder);
  215. }
  216. AcknowledgedResponse response = EsClients.executeRequest(client -> client.indices().putTemplate(request,
  217. RequestOptions.DEFAULT));
  218. if (!response.isAcknowledged()) {
  219. logger.error("Failed to create index template'{}' with alias: {}", tplName, idxAliasName);
  220. }
  221. }
  222. /*
  223. * 返回索引的定义。
  224. */
  225. private static Settings.Builder getIndexSettings(String aliasName) {
  226. final String paramPrefix = "pacific.es.index.";
  227. int shards = ConfigService.getInt(paramPrefix + aliasName + ".number_of_shards", 3);
  228. int replicas = ConfigService.getInt(paramPrefix + aliasName + ".number_of_replicas", 2);
  229. int fields = ConfigService.getInt(paramPrefix + aliasName + ".total_fields", 5000);
  230. return Settings.builder().put("index.number_of_shards", shards)
  231. .put("index.number_of_replicas", replicas).put("index.mapping.total_fields.limit", fields);
  232. }
  233. }

Elasticsearch索引动态字段基础类

  1. package *.*.common.es.support;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Map.Entry;
  6. import java.util.stream.Collectors;
  7. import java.util.stream.Stream;
  8. import org.elasticsearch.common.xcontent.XContentBuilder;
  9. import org.elasticsearch.common.xcontent.XContentFactory;
  10. import *.*.common.config.ConfigService;
  11. import *.*.common.util.ConstantFields;
  12. import *.*.api.error.ServerException;
  13. /**
  14. * Es中对应的几种动态记录的mapping
  15. *
  16. * @author zhanglb
  17. *
  18. */
  19. public class EsActivityMapping {
  20. public static final String FIELD_TYPE_KEYWORD = "keyword";
  21. public static final String FIELD_TYPE_DATE = "date";
  22. public static final String FIELD_TYPE_TEXT = "text";
  23. public static final String FIELD_TYPE_LONG = "long";
  24. public static final String FIELD_TYPE_OBJECT = "object";
  25. public static final String FIELD_TYPE_CUSTOM = "custom";
  26. public static final List<String> NEED_LOWERCASE_TYPE = Stream.of(FIELD_TYPE_KEYWORD, FIELD_TYPE_TEXT).collect(Collectors.toList());
  27. public static final String CONTENT = "Total-OAO-context-OBO";
  28. public static final boolean OPEN_LOWERCASE = ConfigService.getInstance().isOpenLowercase();
  29. private EsActivityMapping() {
  30. }
  31. public static XContentBuilder buildMapping(Map<String, String> propertiesMap) {
  32. try (XContentBuilder builder = XContentFactory.jsonBuilder();) {
  33. builder.startObject();
  34. {
  35. builder.startObject("properties");
  36. if (propertiesMap != null && !propertiesMap.isEmpty()) {
  37. for (Entry<String, String> entry : propertiesMap.entrySet()) {
  38. buildField(builder, entry.getKey(), entry.getValue());
  39. }
  40. }
  41. builder.endObject();
  42. }
  43. return builder.endObject();
  44. } catch (IOException e) {
  45. throw new ServerException("Failed to build index ci mapping", e);
  46. }
  47. }
  48. public static XContentBuilder buildCIMapping() {
  49. try (XContentBuilder builder = XContentFactory.jsonBuilder();) {
  50. builder.startObject();
  51. {
  52. builder.startObject("properties");
  53. buildField(builder, CONTENT, FIELD_TYPE_KEYWORD);
  54. buildField(builder, ConstantFields.FIELD_ID, FIELD_TYPE_KEYWORD);
  55. buildField(builder, ConstantFields.FIELD_CLASSCODE, FIELD_TYPE_KEYWORD);
  56. buildField(builder, ConstantFields.RES_CIRCLE_FIELD, FIELD_TYPE_KEYWORD);
  57. buildField(builder, ConstantFields.CREATE_TIME, FIELD_TYPE_DATE);
  58. buildField(builder, ConstantFields.UPDATE_TIME, FIELD_TYPE_DATE);
  59. buildField(builder, ConstantFields.RES_ATTRVALUES, FIELD_TYPE_OBJECT);
  60. builder.endObject();
  61. }
  62. return builder.endObject();
  63. } catch (IOException e) {
  64. throw new ServerException("Failed to build index ci mapping", e);
  65. }
  66. }
  67. /**
  68. * 创建配置项动态记录字段映射。
  69. */
  70. public static XContentBuilder buildActivityMapping() {
  71. try (XContentBuilder builder = XContentFactory.jsonBuilder();) {
  72. builder.startObject();
  73. {
  74. builder.startObject("properties");
  75. buildField(builder, ConstantFields.ACTIVITY_FIELD_ID, FIELD_TYPE_KEYWORD);
  76. buildField(builder, ConstantFields.ACTIVITY_FIELD_CLASSCODE, FIELD_TYPE_KEYWORD);
  77. buildField(builder, ConstantFields.ACTIVITY_FIELD_CLASSNAME, FIELD_TYPE_KEYWORD);
  78. buildField(builder, ConstantFields.ACTIVITY_FIELD_ACTYPE, FIELD_TYPE_KEYWORD);
  79. buildField(builder, ConstantFields.FIELD_TYPE, FIELD_TYPE_KEYWORD);
  80. buildField(builder, ConstantFields.ACTIVITY_FIELD_TIME, FIELD_TYPE_DATE);
  81. buildField(builder, ConstantFields.ACTIVITY_FIELD_TENANTID, FIELD_TYPE_KEYWORD);
  82. buildField(builder, ConstantFields.ACTIVITY_FIELD_VERSION, FIELD_TYPE_LONG);
  83. buildField(builder, ConstantFields.ACTIVITY_FIELD_CIRCLEID, FIELD_TYPE_KEYWORD);
  84. buildField(builder, ConstantFields.FIELD_USERID, FIELD_TYPE_KEYWORD);
  85. buildField(builder, ConstantFields.ACTIVITY_FIELD_USERNAME, FIELD_TYPE_TEXT);
  86. buildField(builder, ConstantFields.ACTIVITY_FIELD_OPER, FIELD_TYPE_KEYWORD);
  87. buildField(builder, ConstantFields.ACTIVITY_FIELD_OPERNAME, FIELD_TYPE_KEYWORD);
  88. buildField(builder, ConstantFields.ACTIVITY_FIELD_ENTITYID, FIELD_TYPE_KEYWORD);
  89. buildField(builder, ConstantFields.ACTIVITY_FIELD_ENTITYNAME, FIELD_TYPE_TEXT);
  90. buildField(builder, ConstantFields.ACTIVITY_FIELD_DESC, FIELD_TYPE_TEXT);
  91. buildField(builder, ConstantFields.ACTIVITY_FIELD_PARENTCINAME, FIELD_TYPE_TEXT);
  92. buildField(builder, ConstantFields.ACTIVITY_FIELD_TARGETCINAME, FIELD_TYPE_TEXT);
  93. buildField(builder, ConstantFields.ACTIVITY_FIELD_ITSMINFO, FIELD_TYPE_OBJECT);
  94. buildField(builder, ConstantFields.ACTIVITY_FIELD_RELATIONNAME, FIELD_TYPE_TEXT);
  95. buildField(builder, ConstantFields.CI_RECORD_FIELD_SOURCE, FIELD_TYPE_KEYWORD);
  96. buildField(builder, ConstantFields.FIELD_STATE, FIELD_TYPE_KEYWORD);
  97. buildField(builder, ConstantFields.CI_RECORD_FIELD_AUTO_CLAIM, FIELD_TYPE_KEYWORD);
  98. builder.endObject();
  99. }
  100. return builder.endObject();
  101. } catch (IOException e) {
  102. throw new ServerException("Failed to build index mapping", e);
  103. }
  104. }
  105. private static void buildField(XContentBuilder builder, String field, String type) throws IOException {
  106. builder.startObject(field);
  107. {
  108. builder.field("type", type);
  109. if (OPEN_LOWERCASE && NEED_LOWERCASE_TYPE.contains(type)) {
  110. builder.field("normalizer", "lowercase");
  111. }
  112. }
  113. builder.endObject();
  114. }
  115. }

Elasticsearch的资源增删改查

写入数据

  1. /**
  2. *
  3. * @param list
  4. * @param testData
  5. * @param syncByAlias 是否用别名
  6. */
  7. @SuppressWarnings({ "unchecked" })
  8. public void insertDates(List<Test> list, CommonTestData testData, boolean syncByAlias) {
  9. ObjectMapper mapper = new ObjectMapper();
  10. String id = null;
  11. StringJoiner content;
  12. Map<String, TestClass> testClassMap = testData.getClassMap();
  13. Map<String, TestAttribute> testAttributeMap = testData.getAttrMap();
  14. if (resClassMap.isEmpty()) {
  15. return;
  16. }
  17. try {
  18. List<Map<String, Object>> objects = new ArrayList<>();
  19. for (Test entity : list) {
  20. TestClass testClass = testClassMap.get(entity.getClassCode());
  21. if (testClass == null) {
  22. continue;
  23. }
  24. List<TestAttribute> attrs = new ArrayList<>();
  25. for (String code : testClass.getAttrCodes()) {
  26. TestAttribute attr = testAttributeMap.get(code);
  27. if (attr != null) {
  28. attrs.add(attr);
  29. }
  30. }
  31. content = new StringJoiner(ElasticsearchUtil.SPLIT_SYMBOL);
  32. String json = mapper.writeValueAsString(ElasticsearchUtil.converVo(entity, content, testClass, attrs, testData.getDictMap()));
  33. Map<String, Object> productMap = mapper.readValue(json, Map.class);
  34. id = productMap.get(TestAttrFieldCodes.BASE_FIELD_ID).toString();
  35. if (productMap.containsKey(ATTRVALUES)) {
  36. Map<String, Object> result = new HashMap<>();
  37. Map<String, Object> attr = (Map<String, Object>) productMap.get(ATTRVALUES);
  38. for (Map.Entry<String, Object> entry : attr.entrySet()) {
  39. if (entry.getValue() == null || StringUtils.isBlank(entry.getValue().toString())) {
  40. continue;
  41. }
  42. TestAttribute att = testAttributeMap.get(entry.getKey().toString());
  43. if (att != null) {
  44. result.put(att.getType() + SPLIT + entry.getKey(), entry.getValue());
  45. content.add(String.valueOf(entry.getValue()));
  46. }
  47. }
  48. productMap.put(ATTRVALUES, result);
  49. }
  50. productMap.put(ElasticsearchUtil.CONTENT, ElasticsearchUtil.SPLIT_SYMBOL + content.toString() + ElasticsearchUtil.SPLIT_SYMBOL);
  51. objects.add(productMap);
  52. }
  53. if (syncByAlias) {
  54. EsDataWriter.saveObjectsAsync(ElasticsearchUtil.getIndexAliasName(), objects);
  55. } else {
  56. EsDataWriter.saveObjectsAsync(ElasticsearchUtil.getIndexName(), objects);
  57. }
  58. } catch (JsonParseException e) {
  59. resData.clearData();
  60. logger.error("Failed to parse JSON data id= {} , message : {}", id, e);
  61. throw new ServerException("Failed to parse JSON data" + id, e);
  62. } catch (JsonMappingException e) {
  63. resData.clearData();
  64. logger.error("Failed to parse JSON data id= {} , message : {}", id, e);
  65. throw new ServerException("Failed to process JSON data" + id, e);
  66. } catch (IOException e) {
  67. resData.clearData();
  68. logger.error("Failed to parse JSON data id= {} , message : {}", id, e);
  69. throw new ServerException("Failed to execute" + id, e);
  70. }
  71. }

删除数据

  1. @Override
  2. public void delById(String id, String type) {
  3. try {
  4. DeleteRequest deleteRequest = new DeleteRequest(ElasticsearchUtil.getIndexAliasName(), id);
  5. EsClients.getClient().delete(deleteRequest, RequestOptions.DEFAULT);
  6. } catch (Exception e) {
  7. e.printStackTrace();
  8. }
  9. }

查询数据

  1. public Map<String, Object> getSearchData(String index, String type, String value, boolean isPreciseSearch, Integer page, Integer size) {
  2. Map<String, Object> result = new HashMap<>();
  3. Integer from = 0;
  4. if (page - 1 >= 0) {
  5. from = (page - 1) * size;
  6. }
  7. try {
  8. if (isPreciseSearch) {
  9. value = "*" + ElasticsearchUtil.SPLIT_SYMBOL + value.substring(1, value.length() - 1) + ElasticsearchUtil.SPLIT_SYMBOL + "*";
  10. }
  11. if (value.length() == 1) {
  12. value = CommonUtil.replaceRegexChars(value);
  13. } else {
  14. value = CommonUtil.replaceRegexChars(value).replace("\\*", "*");
  15. }
  16. SearchResponse response = this.getResponse(index, type, value, from, size);
  17. long totalRecords = response.getHits().getTotalHits().value;
  18. List<Map<String, Object>> list = new ArrayList<>();
  19. SearchHit[] ciHits = response.getHits().getHits();
  20. CacheContext context = new CacheContext();
  21. for (int i = 0; i < ciHits.length; i++) {
  22. String objId = ciHits[i].getId();
  23. Map<String, Object> map = this.filter(ciHits[i].getSourceAsMap(), objId, context);
  24. map.put(TestAttrFieldCodes.BASE_FIELD_ID, objId);
  25. list.add(map);
  26. }
  27. context.clear();
  28. result.put("totalRecords", totalRecords);
  29. result.put("list", list);
  30. return result;
  31. } catch (Exception e) {// NOSONAR 此处增加代码稳定性
  32. throw new ServerException("Failed to search data", e);
  33. }
  34. }
  35. private SearchResponse getResponse(String index, String type, String value, int from, int size) {
  36. SearchSourceBuilder sb = new SearchSourceBuilder();
  37. if (ConfigService.getInstance().isEsQueryNeedPointTotalOAOContxtObo()) {
  38. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
  39. .must(QueryBuilders.queryStringQuery("*" + value + "*").defaultField(ElasticsearchUtil.getQueryContent()));
  40. // 是否开启分词查询
  41. if (ConfigService.getInstance().isOpenEsParticipleQuery()) {
  42. queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.matchPhraseQuery(ElasticsearchUtil.getQueryContent(), value));
  43. }
  44. queryBuilder.filter(
  45. QueryBuilders.boolQuery().must(QueryBuilders.termQuery(ResAttrFieldCodes.BASE_FIELD_TENANT, ThreadLocalContext.getTenantId())));
  46. sb.query(queryBuilder);
  47. } else {
  48. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(value + "*")).filter(
  49. QueryBuilders.boolQuery().must(QueryBuilders.termQuery(ResAttrFieldCodes.BASE_FIELD_TENANT, ThreadLocalContext.getTenantId())));
  50. sb.query(queryBuilder);
  51. }
  52. // 分页信息
  53. sb.from(from);
  54. sb.size(size);
  55. // 是否统计总数
  56. sb.trackTotalHits(true);
  57. sb.sort(ConstantFields.UPDATE_TIME, SortOrder.DESC);
  58. List<String> indexs = new ArrayList<>();
  59. indexs.add(index);
  60. return EsDataReader.query(indexs, sb);
  61. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/68684
推荐阅读
相关标签
  

闽ICP备14008679号