当前位置:   article > 正文

elasticsearch(8) : javaApi_elasticsearch8 java api

elasticsearch8 java api

参考 : ES工具类ESUtils_心静自然禅的专栏-CSDN博客_es工具类 

maven

  1. <dependency>
  2. <groupId>commons-beanutils</groupId>
  3. <artifactId>commons-beanutils</artifactId>
  4. <version>1.9.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.elasticsearch.client</groupId>
  8. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  9. <version>7.4.2</version>
  10. <exclusions>
  11. <exclusion>
  12. <groupId>org.elasticsearch</groupId>
  13. <artifactId>elasticsearch</artifactId>
  14. </exclusion>
  15. <exclusion>
  16. <groupId>org.elasticsearch.client</groupId>
  17. <artifactId>elasticsearch-rest-client</artifactId>
  18. </exclusion>
  19. </exclusions>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.elasticsearch.client</groupId>
  23. <artifactId>elasticsearch-rest-client</artifactId>
  24. <version>7.4.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.elasticsearch</groupId>
  28. <artifactId>elasticsearch</artifactId>
  29. <version>7.4.2</version>
  30. </dependency>

springboot java代码

  1. import com.alibaba.fastjson.JSON;
  2. import com.aliyun.autodeploy.agent.service.bo.LogSourceBO;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.beanutils.BeanUtils;
  5. import org.apache.http.HttpHost;
  6. import org.apache.http.client.config.RequestConfig;
  7. import org.elasticsearch.action.bulk.BulkRequest;
  8. import org.elasticsearch.action.bulk.BulkResponse;
  9. import org.elasticsearch.action.index.IndexRequest;
  10. import org.elasticsearch.action.index.IndexResponse;
  11. import org.elasticsearch.action.search.SearchRequest;
  12. import org.elasticsearch.action.search.SearchResponse;
  13. import org.elasticsearch.client.RequestOptions;
  14. import org.elasticsearch.client.RestClient;
  15. import org.elasticsearch.client.RestClientBuilder;
  16. import org.elasticsearch.client.RestHighLevelClient;
  17. import org.elasticsearch.common.xcontent.XContentType;
  18. import org.elasticsearch.index.query.QueryBuilder;
  19. import org.elasticsearch.index.query.QueryBuilders;
  20. import org.elasticsearch.search.SearchHit;
  21. import org.elasticsearch.search.builder.SearchSourceBuilder;
  22. import org.springframework.beans.factory.annotation.Value;
  23. import org.springframework.stereotype.Service;
  24. import javax.annotation.PostConstruct;
  25. import java.util.LinkedList;
  26. import java.util.List;
  27. import java.util.Map;
  28. /**
  29. * @Author: liyue
  30. * @Date: 2021/12/10/17:41
  31. * @Description:
  32. */
  33. @Service
  34. @Slf4j
  35. public class ElasticsearchService {
  36. private RestHighLevelClient restHighLevelClient;
  37. @Value("${es.hostname}")
  38. private String esHostname;
  39. @Value("${es.port}")
  40. private Integer esPort;
  41. @PostConstruct
  42. private void run() {
  43. // 设置IP
  44. HttpHost esHost = new HttpHost(esHostname, esPort);
  45. RestClientBuilder restClientBuilder = RestClient.builder(esHost);
  46. setTimeout(restClientBuilder);
  47. restHighLevelClient = new RestHighLevelClient(restClientBuilder);
  48. }
  49. /**
  50. * 设置超时时间
  51. */
  52. private void setTimeout(RestClientBuilder restClientBuilder) {
  53. restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
  54. @Override
  55. public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
  56. return builder.setConnectTimeout(3000)
  57. .setSocketTimeout(50000);
  58. }
  59. });
  60. }
  61. /**
  62. * 新增
  63. * @return 保存结果
  64. * @throws Exception exception
  65. */
  66. public IndexResponse save(Object o, String index){
  67. try {
  68. IndexRequest indexRequest = new IndexRequest(index);
  69. indexRequest.source(JSON.toJSONString(o), XContentType.JSON);
  70. indexRequest.id();
  71. return restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  72. } catch (Exception e) {
  73. log.error("插入数据失败",e);
  74. return null;
  75. }
  76. }
  77. /**
  78. * 批量新增
  79. *
  80. * @param list 数据list
  81. * @return 返回保存结果
  82. * @throws Exception exception
  83. */
  84. public BulkResponse batchSave(List<LogSourceBO> list, String index){
  85. try {
  86. log.info("数据发送ES,size:{},index:{}",list.size(),index);
  87. BulkRequest bulkRequest = new BulkRequest();
  88. IndexRequest indexRequest;
  89. for (LogSourceBO item : list) {
  90. indexRequest = new IndexRequest(index);
  91. indexRequest.source(JSON.toJSONString(item), XContentType.JSON);
  92. indexRequest.id();
  93. bulkRequest.add(indexRequest);
  94. }
  95. return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  96. }catch (Exception e){
  97. log.error("批量保存失败",e);
  98. return null;
  99. }
  100. }
  101. /**
  102. * 查询符合查询条件的所有数据
  103. *
  104. * @return 返回结果list
  105. * @throws Exception exception
  106. */
  107. public List<LogSourceBO> query(Long startTime, Long endTime, String index) {
  108. List<LogSourceBO> logSourceBOS = new LinkedList<>();
  109. try {
  110. SearchRequest request = new SearchRequest();
  111. request.indices(index);
  112. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  113. QueryBuilder queryBuilder = QueryBuilders.boolQuery()
  114. .must(QueryBuilders.rangeQuery("timestamp")
  115. .gte(startTime)
  116. .lte(endTime));
  117. searchSourceBuilder.query(queryBuilder);
  118. request.source(searchSourceBuilder);
  119. SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  120. if (res.getHits().getHits() != null) {
  121. SearchHit[] hits = res.getHits().getHits();
  122. for (SearchHit hit : hits) {
  123. Map<String, Object> resultOne = hit.getSourceAsMap();
  124. logSourceBOS.add(convertMap2Model(resultOne));
  125. }
  126. }
  127. }catch (Exception e){
  128. log.error("查询失败",e);
  129. }
  130. return logSourceBOS;
  131. }
  132. private LogSourceBO convertMap2Model(Map<String, Object> map) throws Exception {
  133. LogSourceBO logSourceBO = new LogSourceBO();
  134. BeanUtils.populate(logSourceBO, map);
  135. return logSourceBO;
  136. }
  137. }

测试

  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.aliyun.gts.tt.ms.service.bo.LogSourceBO;
  4. import org.apache.commons.beanutils.BeanUtils;
  5. import org.apache.http.HttpHost;
  6. import org.apache.http.client.config.RequestConfig;
  7. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  8. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  9. import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
  10. import org.elasticsearch.action.index.IndexRequest;
  11. import org.elasticsearch.action.index.IndexResponse;
  12. import org.elasticsearch.action.search.SearchRequest;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.client.RequestOptions;
  15. import org.elasticsearch.client.RestClient;
  16. import org.elasticsearch.client.RestClientBuilder;
  17. import org.elasticsearch.client.RestHighLevelClient;
  18. import org.elasticsearch.common.xcontent.XContentType;
  19. import org.elasticsearch.index.query.QueryBuilder;
  20. import org.elasticsearch.index.query.QueryBuilders;
  21. import org.elasticsearch.search.SearchHit;
  22. import org.elasticsearch.search.builder.SearchSourceBuilder;
  23. import java.io.IOException;
  24. import java.util.LinkedList;
  25. import java.util.List;
  26. import java.util.Map;
  27. /**
  28. * @Author: liyue
  29. * @Date: 2021/12/10/16:29
  30. * @Description:
  31. */
  32. public class Test {
  33. public static final String INDEX = "test_index_2021_12_10";
  34. private static RestHighLevelClient restHighLevelClient;
  35. static {
  36. HttpHost esHost = new HttpHost("127.0.0.1", 9200);
  37. RestClientBuilder restClientBuilder = RestClient.builder(esHost);
  38. setTimeout(restClientBuilder);
  39. restHighLevelClient = new RestHighLevelClient(restClientBuilder);
  40. }
  41. public static void main(String[] args) throws Exception {
  42. /* LogSourceBO testDTO = new LogSourceBO(
  43. System.currentTimeMillis(),
  44. "g1","u1","d1","k1",1.0
  45. );
  46. save(testDTO);
  47. */
  48. List<LogSourceBO> g1 = query(1639150440001L, 1639150440001L);
  49. System.out.println(JSONObject.toJSONString(g1));
  50. restHighLevelClient.close();
  51. }
  52. /**
  53. * 查询符合查询条件的所有数据
  54. * @return 返回结果list
  55. * @throws Exception exception
  56. */
  57. public static List<LogSourceBO> query(Long startTime, Long endTime) throws Exception {
  58. List<LogSourceBO> logSourceBOS = new LinkedList<>();
  59. SearchRequest request = new SearchRequest();
  60. request.indices(INDEX);
  61. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  62. QueryBuilder queryBuilder = QueryBuilders.boolQuery()
  63. .must(QueryBuilders.rangeQuery("timestamp")
  64. .gte(startTime)
  65. .lte(endTime));
  66. searchSourceBuilder.query(queryBuilder);
  67. request.source(searchSourceBuilder);
  68. SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  69. if (res.getHits().getHits() != null) {
  70. SearchHit[] hits = res.getHits().getHits();
  71. for (SearchHit hit : hits) {
  72. Map<String, Object> resultOne = hit.getSourceAsMap();
  73. logSourceBOS.add(convertMap2Model(resultOne));
  74. }
  75. }
  76. return logSourceBOS;
  77. }
  78. private static LogSourceBO convertMap2Model(Map<String, Object> map) throws Exception {
  79. LogSourceBO logSourceBO = new LogSourceBO();
  80. BeanUtils.populate(logSourceBO, map);
  81. return logSourceBO;
  82. }
  83. /**
  84. * 创建索引,新版ES插入数据时自动创建
  85. *
  86. * @return 返回索引创建结果
  87. * @throws IOException exception
  88. */
  89. public static CreateIndexResponse createIndex() throws IOException {
  90. CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
  91. return restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  92. }
  93. /**
  94. * 索引是否存在
  95. *
  96. * @return true/false
  97. */
  98. public static boolean existIndex() {
  99. GetIndexRequest request = new GetIndexRequest();
  100. request.indices(INDEX);
  101. try {
  102. return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  103. } catch (IOException e) {
  104. e.printStackTrace();
  105. return false;
  106. }
  107. }
  108. /**
  109. * 新增/修改数据
  110. *
  111. * @return 保存结果
  112. * @throws Exception exception
  113. */
  114. public static IndexResponse save(Object o) throws Exception {
  115. IndexRequest indexRequest = new IndexRequest(INDEX);
  116. indexRequest.source(JSON.toJSONString(o), XContentType.JSON);
  117. indexRequest.id();
  118. return restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  119. }
  120. /**
  121. * 设置超时时间
  122. */
  123. private static void setTimeout(RestClientBuilder restClientBuilder) {
  124. restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
  125. @Override
  126. public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
  127. return builder.setConnectTimeout(3000)
  128. .setSocketTimeout(50000);
  129. }
  130. });
  131. }
  132. }

参考

  1. import com.alibaba.fastjson.JSON;
  2. import org.apache.commons.beanutils.BeanUtils;
  3. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  4. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  5. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  6. import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
  7. import org.elasticsearch.action.bulk.BulkRequest;
  8. import org.elasticsearch.action.bulk.BulkResponse;
  9. import org.elasticsearch.action.get.GetRequest;
  10. import org.elasticsearch.action.get.GetResponse;
  11. import org.elasticsearch.action.index.IndexRequest;
  12. import org.elasticsearch.action.index.IndexResponse;
  13. import org.elasticsearch.action.search.SearchRequest;
  14. import org.elasticsearch.action.search.SearchResponse;
  15. import org.elasticsearch.action.support.master.AcknowledgedResponse;
  16. import org.elasticsearch.client.RequestOptions;
  17. import org.elasticsearch.client.RestClientBuilder;
  18. import org.elasticsearch.client.RestHighLevelClient;
  19. import org.elasticsearch.common.xcontent.XContentType;
  20. import org.elasticsearch.index.query.QueryBuilder;
  21. import org.elasticsearch.search.SearchHit;
  22. import org.elasticsearch.search.builder.SearchSourceBuilder;
  23. import org.elasticsearch.search.sort.SortOrder;
  24. import java.io.IOException;
  25. import java.lang.reflect.Method;
  26. import java.util.ArrayList;
  27. import java.util.List;
  28. import java.util.Map;
  29. /**
  30. * @ProjectName: pet_data
  31. * @Package: com.example.demo.es
  32. * @ClassName: EsUtils
  33. * @Author: Devin.W.Zhang
  34. * @Description: ES 操作工具类
  35. * @Date: 12/22/2020 10:58 AM
  36. * @Version: 1.0
  37. */
  38. public class EsUtils<T> {
  39. public static final String INDEX = "ly-test-20211210";
  40. private RestHighLevelClient restHighLevelClient;
  41. /**
  42. * 索引是否存在
  43. *
  44. * @param indexName 索引名称
  45. * @return true/false
  46. */
  47. public boolean existIndex(String indexName) {
  48. GetIndexRequest request = new GetIndexRequest();
  49. request.indices(indexName);
  50. try {
  51. return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  52. } catch (IOException e) {
  53. e.printStackTrace();
  54. return false;
  55. }
  56. }
  57. /**
  58. * 创建索引,新版ES插入数据时自动创建
  59. *
  60. * @param index 索引
  61. * @return 返回索引创建结果
  62. * @throws IOException exception
  63. */
  64. public CreateIndexResponse createIndex(String index) throws IOException {
  65. CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
  66. return restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  67. }
  68. /**
  69. * 删除索引
  70. *
  71. * @param index 索引
  72. * @return 返回删除结果
  73. * @throws IOException exception
  74. */
  75. public AcknowledgedResponse deleteIndex(String index) throws IOException {
  76. DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
  77. return restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
  78. }
  79. /**
  80. * 通过id获取数据
  81. *
  82. * @param id 主键
  83. * @return 返回查询到的结果
  84. * @throws IOException exception
  85. */
  86. public T get(String id, Class<T> classT) throws Exception {
  87. System.out.println(String.format("get the data from es, id:%s", id));
  88. GetRequest request = new GetRequest(INDEX, id);
  89. GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
  90. Map<String, Object> result = getResponse.getSourceAsMap();
  91. System.out.println(String.format("the es response:%s", result));
  92. T t = convertMap2Model(result, classT);
  93. System.out.println(String.format("change the result to %s: ,the result is :%s", classT, t));
  94. return t;
  95. }
  96. /**
  97. * 新增/修改数据
  98. *
  99. * @param t 数据对象
  100. * @param primaryKey 主键字段名
  101. * @return 保存结果
  102. * @throws Exception exception
  103. */
  104. public IndexResponse save(T t, String primaryKey) throws Exception {
  105. IndexRequest indexRequest = new IndexRequest(INDEX);
  106. indexRequest.source(JSON.toJSONString(t), XContentType.JSON);
  107. indexRequest.id(getId(t, primaryKey));
  108. return restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  109. }
  110. /**
  111. * 批量 新增/修改数据
  112. *
  113. * @param list 数据list
  114. * @param primaryKey 主键
  115. * @return 返回保存结果
  116. * @throws Exception exception
  117. */
  118. public BulkResponse batchSave(List<T> list, String primaryKey) throws Exception {
  119. BulkRequest bulkRequest = new BulkRequest();
  120. IndexRequest indexRequest;
  121. for (T item : list) {
  122. indexRequest = new IndexRequest(INDEX);
  123. indexRequest.source(JSON.toJSONString(item), XContentType.JSON);
  124. indexRequest.id(getId(item, primaryKey));
  125. bulkRequest.add(indexRequest);
  126. }
  127. return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  128. }
  129. /**
  130. * 查询符合查询条件的所有数据
  131. *
  132. * @param queryBuilder queryBuilder
  133. * @param orders 排序map
  134. * @param tClass class
  135. * @return 返回结果list
  136. * @throws Exception exception
  137. */
  138. public List<T> query(QueryBuilder queryBuilder, Map<String, SortOrder> orders, Class<T> tClass) throws Exception {
  139. final int currentPage = 1;
  140. final int pageSize = 100000;
  141. Object[] obj = pageQuery(currentPage, pageSize, queryBuilder, orders, tClass);
  142. return (List<T>) obj[1];
  143. }
  144. /**
  145. * 分页查询
  146. * <p>
  147. * 注意当 (currentPage * pageSize >10000) 该分页查询会报错,
  148. * 通过postman put方式调用 http://127.0.0.1:9200/_all/_settings?preserve_existing=true 设置 {"index.max_result_window" : "1000000"}
  149. * 设置成功会返回 {"acknowledged": true}
  150. * </p>
  151. *
  152. * @param currentPage 当前页
  153. * @param pageSize 每页条数
  154. * @param queryBuilder queryBuilder
  155. * @param orders 排序map
  156. * @param tClass class
  157. * @return 返回 总条数和数据list
  158. * @throws Exception exception
  159. */
  160. public Object[] pageQuery(int currentPage, int pageSize, QueryBuilder queryBuilder, Map<String, SortOrder> orders, Class<T> tClass) throws Exception {
  161. Object[] result = new Object[2];
  162. long totalCount;
  163. List<T> list = new ArrayList<>();
  164. SearchRequest request = new SearchRequest();
  165. request.indices(INDEX);
  166. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  167. searchSourceBuilder.query(queryBuilder);
  168. request.source(searchSourceBuilder);
  169. //下面的配置为true则如果总条数显示为实际查询到的条数,否则最大只显示10000
  170. searchSourceBuilder.trackTotalHits(true);
  171. //起始
  172. searchSourceBuilder.from((currentPage - 1) * pageSize);
  173. //默认最大10000
  174. searchSourceBuilder.size(pageSize);
  175. //排序
  176. if (null != orders) {
  177. for (Map.Entry<String, SortOrder> order : orders.entrySet()) {
  178. searchSourceBuilder.sort(order.getKey(), order.getValue());
  179. }
  180. }
  181. SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  182. totalCount = res.getHits().getTotalHits().value;
  183. System.out.println(String.format("totalHits: %s", totalCount));
  184. if (res.getHits().getHits() != null) {
  185. SearchHit[] hits = res.getHits().getHits();
  186. for (SearchHit hit : hits) {
  187. Map<String, Object> resultOne = hit.getSourceAsMap();
  188. T t = convertMap2Model(resultOne, tClass);
  189. list.add(t);
  190. }
  191. }
  192. result[0] = totalCount;
  193. result[1] = list;
  194. return result;
  195. }
  196. /*下面是一些辅助方法*/
  197. /**
  198. * 通过反射获取主键的值
  199. *
  200. * @param t 对象
  201. * @param primaryKey 主键字段名称
  202. * @return 返回主键的值
  203. * @throws Exception exception
  204. */
  205. private String getId(T t, String primaryKey) throws Exception {
  206. Class<?> aClass = t.getClass();
  207. String methodName = "get" + (primaryKey.charAt(0) + "").toUpperCase() + primaryKey.substring(1);
  208. Method method = aClass.getDeclaredMethod(methodName);
  209. return (String) method.invoke(t);
  210. }
  211. private T convertMap2Model(Map<String, Object> map, Class<T> classT) throws Exception {
  212. Object o = classT.newInstance();
  213. BeanUtils.populate(o, map);
  214. return (T) o;
  215. }
  216. /**
  217. * 关闭连接
  218. */
  219. public void close() {
  220. try {
  221. if (restHighLevelClient != null) {
  222. restHighLevelClient.close();
  223. }
  224. } catch (Exception e) {
  225. e.printStackTrace();
  226. }
  227. }
  228. }

扩展

范围查询+精确查询

  1. SearchRequest request = new SearchRequest();
  2. request.indices(index);
  3. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  4. QueryBuilder queryBuilder = QueryBuilders.boolQuery()
  5. .must(QueryBuilders.rangeQuery("time")
  6. .gte(startTime)
  7. .lte(endTime))
  8. .must(QueryBuilders.termQuery("age","18));
  9. searchSourceBuilder.query(queryBuilder);
  10. searchSourceBuilder.size(10);
  11. searchSourceBuilder.from(0);
  12. request.source(searchSourceBuilder);
  13. SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/68757
推荐阅读
相关标签
  

闽ICP备14008679号