当前位置:   article > 正文

java 使用springBoot 操作Elasticsearch组件_springboot es bulkrequest bulkprocessor

springboot es bulkrequest bulkprocessor

简介

  该代码是以组件的形式写入,使用时在其他方法中注入  ElasticSearchUtil 类进行使用,主要实现了动态的增删改查,分页模糊查询,以指定字段查询与全部字段进行全差

should和must的比较

  1. should模式:就算有一个字段没有匹配的也会有结果返回
  2. must模式:必须要求所有字段都匹配到,只要有一个字段不匹配就没有搜索结果

版本信息

  • Elasticsearch:2.x
  • jdk  1.8

maven 依赖

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>1.5.8.RELEASE</version>
  5. </parent>
  6. <properties>
  7. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  8. <java.version>1.8</java.version>
  9. </properties>
  10. <dependencies>
  11. <!-- elasticsearch start -->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  15. </dependency>
  16. <!-- springBoot start -->
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-test</artifactId>
  20. <scope>test</scope>
  21. </dependency>
  22. <dependency>
  23. <groupId>com.sun.jna</groupId>
  24. <artifactId>jna</artifactId>
  25. <version>3.0.9</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-jetty</artifactId>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-starter-aop</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-web</artifactId>
  38. </dependency>
  39. <!-- springBoot end -->
  40. <!-- json start -->
  41. <dependency>
  42. <groupId>com.alibaba</groupId>
  43. <artifactId>fastjson</artifactId>
  44. <version>1.2.4</version>
  45. </dependency>
  46. <!-- json end -->
  47. </dependencies>

config 底层操作类

  1. package *.config;
  2. import java.lang.reflect.Field;
  3. import java.util.Map;
  4. import java.util.Set;
  5. import javax.annotation.PostConstruct;
  6. import javax.annotation.PreDestroy;
  7. import org.elasticsearch.action.bulk.BackoffPolicy;
  8. import org.elasticsearch.action.bulk.BulkProcessor;
  9. import org.elasticsearch.action.bulk.BulkRequest;
  10. import org.elasticsearch.action.bulk.BulkResponse;
  11. import org.elasticsearch.action.delete.DeleteResponse;
  12. import org.elasticsearch.action.get.GetResponse;
  13. import org.elasticsearch.action.index.IndexResponse;
  14. import org.elasticsearch.action.search.SearchRequestBuilder;
  15. import org.elasticsearch.action.search.SearchResponse;
  16. import org.elasticsearch.action.update.UpdateResponse;
  17. import org.elasticsearch.client.Client;
  18. import org.elasticsearch.common.unit.ByteSizeUnit;
  19. import org.elasticsearch.common.unit.ByteSizeValue;
  20. import org.elasticsearch.common.unit.TimeValue;
  21. import org.elasticsearch.index.query.BoolQueryBuilder;
  22. import org.elasticsearch.index.query.QueryBuilders;
  23. import org.elasticsearch.search.SearchHit;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.stereotype.Component;
  28. import com.alibaba.fastjson.JSONArray;
  29. import com.alibaba.fastjson.JSONObject;
  30. /**
  31. * 底层操作类
  32. * @ClassName: ElasticConfigration
  33. * @date 2018年11月20日
  34. * @author tang wang
  35. *
  36. */
  37. @Component
  38. public class ElasticConfigration {
  39. private final Logger logger = LoggerFactory.getLogger(ElasticConfigration.class);
  40. @Autowired
  41. private Client client;
  42. private BulkProcessor bulkProcessor;
  43. @PostConstruct
  44. public void initBulkProcessor() {
  45. bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
  46. @Override
  47. public void beforeBulk(long executionId, BulkRequest request) {
  48. logger.info("序号:{} 开始执行{} 条记录保存", executionId, request.numberOfActions());
  49. }
  50. @Override
  51. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  52. logger.error(String.format("序号:%s 执行失败; 总记录数:%s", executionId, request.numberOfActions()), failure);
  53. }
  54. @Override
  55. public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
  56. logger.info("序号:{} 执行{}条记录保存成功,耗时:{}毫秒,", executionId,
  57. request.numberOfActions(), response.getTookInMillis());
  58. }
  59. }).setBulkActions(1000)
  60. .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
  61. .setConcurrentRequests(4)
  62. .setFlushInterval(TimeValue.timeValueSeconds(5))
  63. /**
  64. * 失败后等待多久及重试次数
  65. */
  66. .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(500), 3))
  67. .build();
  68. }
  69. @PreDestroy
  70. public void closeBulk() {
  71. if (bulkProcessor != null) {
  72. try {
  73. bulkProcessor.close();
  74. } catch (Exception e) {
  75. logger.error("close bulkProcessor exception", e);
  76. }
  77. }
  78. }
  79. /**
  80. * 批量添加,性能最好
  81. *
  82. */
  83. public void addDocumentToBulkProcessor(String indices, String type, Object object) {
  84. bulkProcessor.add(client.prepareIndex(indices, type).setSource(JSONObject.toJSONString(object)).request());
  85. }
  86. /**
  87. * 添加数据
  88. * @Title: ElasticConfigration
  89. * @Description:
  90. * @param indices 索引名字
  91. * @param type 索引类型
  92. * @param object 索引数据
  93. * @author: tao wang
  94. * @date: 2018年11月14日
  95. * @throws
  96. */
  97. public void addDocument(String indices, String type, Object object) {
  98. IndexResponse resp = client.prepareIndex(indices, type).setSource(JSONObject.toJSONString(object)).get();
  99. logger.info("添加结果:{}", resp.toString());
  100. }
  101. /**
  102. * 按照Id 进行删除
  103. * @Title: ElasticConfigration
  104. * @Description:
  105. * @param index 索引名称
  106. * @param type 索引类型
  107. * @param id 数据Id
  108. * @author: tao wang
  109. * @date: 2018年11月14日
  110. * @throws
  111. */
  112. public void deleteDocumentById(String index, String type, String id) {
  113. // new DeleteByQueryRequest(search);
  114. DeleteResponse resp = client.prepareDelete(index, type, id).get();
  115. logger.info("删除结果:{}", resp.toString());
  116. }
  117. /**
  118. * 查询单个数据
  119. * @param index 引擎名称
  120. * @param type 引擎类型
  121. * @param id 引擎Id
  122. * @return
  123. * @auth tao wang
  124. */
  125. public JSONObject getDocmentById(String index, String type, String id) {
  126. JSONObject obj = new JSONObject();
  127. GetResponse getResponse = client.prepareGet(index, type, id).get();
  128. Map<String, Object> map = getResponse.getSource();
  129. obj.put("id", id);
  130. obj.put("value", map);
  131. return JSONObject.parseObject(JSONObject.toJSONString(map));
  132. }
  133. /**
  134. * 按ID更新
  135. * @Title: ElasticConfigration
  136. * @Description:
  137. * @param indices 索引名称
  138. * @param type 索引类型
  139. * @param id 数据Id
  140. * @param object 数据值
  141. * @author: tao wang
  142. * @date: 2018年11月14日
  143. * @throws
  144. */
  145. public void updateDocument(String indices, String type, String id, Object object) {
  146. UpdateResponse resp = client.prepareUpdate(indices, type, id).setDoc(JSONObject.toJSONString(object)).get();
  147. logger.info("更新结果:{}", resp.toString());
  148. }
  149. /**
  150. * 查询所有字段,默认分页
  151. * @Title: ElasticConfigration
  152. * @Description:
  153. * @param indices 索引名称
  154. * @param type 索引类型
  155. * @param clazz 返回的集合对象
  156. * @return
  157. * @author: tao wang
  158. * @date: 2018年11月14日
  159. * @throws
  160. */
  161. public JSONArray queryDocumentByParam(String indices, String type) {
  162. SearchRequestBuilder builder = buildRequest(indices, type);
  163. SearchResponse resp = builder.get();
  164. return convertResponse(resp);
  165. }
  166. /**
  167. * 进行分页数据查询
  168. * @param indices 索引名称
  169. * @param type 索引类型
  170. * @param key 模糊字段
  171. * @param value 模糊字段
  172. * @param pageNumber 当前页数
  173. * @param pageSize 每页显示数据
  174. * @return
  175. * @auth tao wang
  176. * @date 2018年11月26日
  177. */
  178. public JSONArray queryDocumentByParam(String indices, String type, String key, String value, Integer pageNumber, Integer pageSize) {
  179. SearchRequestBuilder builder = buildRequest(indices, type);
  180. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  181. /**
  182. * must 满足所有才返回
  183. */
  184. boolQueryBuilder.must(QueryBuilders.wildcardQuery(key, ("*" + value + "*").toLowerCase()));
  185. builder.setQuery(boolQueryBuilder);
  186. builder.setFrom(pageNumber).setSize(pageSize);
  187. SearchResponse resp = builder.get();
  188. return convertResponse(resp);
  189. }
  190. /**
  191. * 进行模糊查询所有,不进行分页
  192. * @param indices 索引名称
  193. * @param type 索引类型
  194. * @param value 模糊字段
  195. * @param obj 类
  196. * @return
  197. * @auth tao wang
  198. * @date 2018年11月26日
  199. */
  200. @SuppressWarnings("rawtypes")
  201. public JSONArray queryDocumentByParam(String indices, String type, String value, Class clazz) {
  202. SearchRequestBuilder builder = buildRequest(indices, type);
  203. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  204. // 获取实体类的所有属性信息,返回Field数组
  205. Field[] fields = clazz.getDeclaredFields();
  206. System.err.println(fields.toString());
  207. for (Field field : fields) {
  208. /**
  209. * should 满足一个都返回
  210. */
  211. boolQueryBuilder.should(QueryBuilders.wildcardQuery(field.getName(), ("*" + value + "*").toLowerCase()));
  212. System.err.println(field.getName());
  213. }
  214. builder.setQuery(boolQueryBuilder);
  215. SearchResponse resp = builder.get();
  216. return convertResponse(resp);
  217. }
  218. /**
  219. * 进行模糊查询所有,不进行分页
  220. * @param indices 索引名称
  221. * @param type 索引类型
  222. * @param value 模糊字段
  223. * @param setKey 类的属性名称
  224. * @return
  225. * @auth tao wang
  226. * @date 2018年11月26日
  227. */
  228. public JSONArray queryDocumentByParam(String indices, String type, String value, Set<String> setKey) {
  229. SearchRequestBuilder builder = buildRequest(indices, type);
  230. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  231. for (String string : setKey) {
  232. boolQueryBuilder.should(QueryBuilders.wildcardQuery(string, ("*" + value + "*").toLowerCase()));
  233. }
  234. builder.setQuery(boolQueryBuilder);
  235. SearchResponse resp = builder.get();
  236. return convertResponse(resp);
  237. }
  238. /**
  239. * 通用的装换返回结果
  240. * @Title: ElasticConfigration
  241. * @Description:
  242. * @param response 数据
  243. * @param clazz 实体类
  244. * @return
  245. * @author: tao wang
  246. * @date: 2018年11月14日
  247. * @throws
  248. */
  249. public JSONArray convertResponse(SearchResponse response) {
  250. JSONArray list = new JSONArray();
  251. if (response != null && response.getHits() != null) {
  252. for (SearchHit hit : response.getHits()) {
  253. Map<String, Object> source = hit.getSource();
  254. String result = JSONObject.toJSONString(source);
  255. if (org.springframework.util.StringUtils.hasText(result)) {
  256. JSONObject obj = new JSONObject();
  257. obj.put("id", hit.getId());
  258. obj.put("value", result);
  259. list.add(obj);
  260. }
  261. }
  262. }
  263. return list;
  264. }
  265. /**
  266. * 进行数据数据绑定
  267. * @Title: ElasticConfigration
  268. * @Description:
  269. * @param indices 索引名称
  270. * @param type 索引类型
  271. * @return
  272. * @author: tao wang
  273. * @date: 2018年11月14日
  274. * @throws
  275. */
  276. public SearchRequestBuilder buildRequest(String indices, String type) {
  277. return client.prepareSearch(indices).setTypes(type);
  278. }
  279. }

业务实现类:

 

  1. package *.util;
  2. import java.util.Set;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import com.alibaba.fastjson.JSONArray;
  6. import com.alibaba.fastjson.JSONObject;
  7. import *.config.ElasticConfigration;
  8. /**
  9. * 业务实现接口
  10. * @ClassName: ElasticSearchUtil
  11. * @date 2018年11月20日
  12. * @author tang wang
  13. *
  14. */
  15. @Component
  16. public class ElasticSearchUtil {
  17. @Autowired
  18. private ElasticConfigration elasticConfigration;
  19. /**
  20. * 添加指定的索引名称和类型数据
  21. * @Title: ElasticConfigUtil
  22. * @Description:
  23. * @param indices 索引名称
  24. * @param type 索引类型
  25. * @param object JSON 对象
  26. * @return
  27. * @author: tao wang
  28. * @date: 2018年11月14日
  29. * @throws
  30. */
  31. public boolean insert(String indices, String type, JSONObject object) {
  32. elasticConfigration.addDocument(indices.toLowerCase(), type.toLowerCase(), object);
  33. return true;
  34. }
  35. /**
  36. * 添加多条数据
  37. * @param indices 索引名称
  38. * @param type 索引类型
  39. * @param array 索引的对象集合
  40. * @return
  41. */
  42. public boolean insert(String indices, String type, JSONArray array) {
  43. for (Object object : array) {
  44. elasticConfigration.addDocument(indices.toLowerCase(), type.toLowerCase(), object);
  45. }
  46. return true;
  47. }
  48. /**
  49. * 删除单条数据
  50. * @param @param indices 索引名称
  51. * @param @param type 索引类型
  52. * @param @param id 索引Id
  53. * @param @return
  54. * @return
  55. * @auth tao wang
  56. */
  57. public boolean delete(String indices, String type, String id) {
  58. elasticConfigration.deleteDocumentById(indices.toLowerCase(), type.toLowerCase(), id);
  59. return true;
  60. }
  61. /**
  62. * 删除多条数据
  63. * @param @param indices 索引名称
  64. * @param @param type 索引类型
  65. * @param @param ids id 集合
  66. * @param @return
  67. * @return
  68. * @auth tao wang
  69. */
  70. public boolean delete(String indices, String type, Set<String> ids) {
  71. for (String id : ids) {
  72. elasticConfigration.deleteDocumentById(indices.toLowerCase(), type.toLowerCase(), id);
  73. }
  74. return true;
  75. }
  76. /**
  77. * 获取单个数据
  78. * @param @param index 索引名称
  79. * @param @param type 索引类型
  80. * @param @param id 索引Id
  81. * @param @return 返回数据
  82. * @return
  83. * @auth tao wang
  84. */
  85. public JSONObject getDocmentById(String indices, String type, String id) {
  86. return elasticConfigration.getDocmentById(indices.toLowerCase(), type.toLowerCase(), id);
  87. }
  88. /**
  89. * 修改单条数据
  90. * @param @param indices 索引名称
  91. * @param @param type 索引类型
  92. * @param @param id 索引Id
  93. * @param @param object 修改数据 数据不修改传原值
  94. * @param @return
  95. * @return
  96. * @auth tao wang
  97. */
  98. public boolean update(String indices, String type, String id, JSONObject object) {
  99. elasticConfigration.updateDocument(indices.toLowerCase(), type.toLowerCase(), id, object);
  100. return true;
  101. }
  102. /**
  103. * 分页查询搜索引擎里面的数据
  104. * @Title: ElasticSearchUtil
  105. * @Description:
  106. * @param pageNumber 当前页数
  107. * @param pageSize 每页条数
  108. * @param key 要模糊字段名称
  109. * @param value 要模糊字段值
  110. * @param indices 索引名称
  111. * @param type 索引类型
  112. * @param
  113. * @return
  114. * @author: tao wang
  115. * @date: 2018年11月12日
  116. * @throws
  117. */
  118. public JSONArray searchEmployee(String indices, String type, Integer pageNumber,
  119. Integer pageSize, String key, String value) {
  120. return elasticConfigration.queryDocumentByParam(indices.toLowerCase(), type.toLowerCase(),
  121. key, value, pageNumber, pageSize);
  122. }
  123. /**
  124. * 查询搜索里面的数据
  125. * @param indices 索引名称
  126. * @param type 索引类型
  127. * @param value 模糊字段名称
  128. * @param obj 类属性
  129. * @return
  130. * @auth tao wang
  131. * @date 2018年11月26日
  132. */
  133. @SuppressWarnings("rawtypes")
  134. public JSONArray searchEmployee(String indices, String type, String value, Class clazz) {
  135. return elasticConfigration.queryDocumentByParam(indices.toLowerCase(), type.toLowerCase(), value, clazz);
  136. }
  137. /**
  138. * 查询搜索里面的数据
  139. * @param indices 索引名称
  140. * @param type 索引类型
  141. * @param value 模糊字段名称
  142. * @param setKsy 类属性名称集合
  143. * @return
  144. * @auth tao wang
  145. * @date 2018年11月26日
  146. */
  147. public JSONArray searchEmployee(String indices, String type, String value, Set<String> setKsy) {
  148. return elasticConfigration.queryDocumentByParam(indices.toLowerCase(), type.toLowerCase(), value, setKsy);
  149. }
  150. /**
  151. * 不进行分页查询
  152. * @Title: ElasticSearchUtil
  153. * @Description:
  154. * @param key 要模糊字段名称,多个中间逗号分开
  155. * @param value 要模糊字段值
  156. * @param indices 索引名称
  157. * @param type 索引类型
  158. * @param clazz 要返回的类集合
  159. * @return
  160. * @author: tao wang
  161. * @date: 2018年11月12日
  162. * @throws
  163. */
  164. public JSONArray searchEmployee(String indices, String type) {
  165. return elasticConfigration.queryDocumentByParam(indices.toLowerCase(), type.toLowerCase());
  166. }
  167. }

配置类:application.properties

  1. ##es的名称 默认为elasticsearch
  2. spring.data.elasticsearch.cluster-name=*
  3. #配置es节点信息,逗号分隔,如果没有指定,则启动ClientNode
  4. spring.data.elasticsearch.cluster-nodes=*:9300

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/334412
推荐阅读
相关标签
  

闽ICP备14008679号