赞
踩
参考 : ES工具类ESUtils_心静自然禅的专栏-CSDN博客_es工具类
<dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.3</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.4.2</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> <exclusion> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.4.2</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.4.2</version> </dependency>
- import com.alibaba.fastjson.JSON;
- import com.aliyun.autodeploy.agent.service.bo.LogSourceBO;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.beanutils.BeanUtils;
- import org.apache.http.HttpHost;
- import org.apache.http.client.config.RequestConfig;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.PostConstruct;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
-
- /**
- * @Author: liyue
- * @Date: 2021/12/10/17:41
- * @Description:
- */
- @Service
- @Slf4j
- public class ElasticsearchService {
-
- private RestHighLevelClient restHighLevelClient;
-
- @Value("${es.hostname}")
- private String esHostname;
-
- @Value("${es.port}")
- private Integer esPort;
-
- @PostConstruct
- private void run() {
- // 设置IP
- HttpHost esHost = new HttpHost(esHostname, esPort);
- RestClientBuilder restClientBuilder = RestClient.builder(esHost);
- setTimeout(restClientBuilder);
- restHighLevelClient = new RestHighLevelClient(restClientBuilder);
- }
-
- /**
- * 设置超时时间
- */
- private void setTimeout(RestClientBuilder restClientBuilder) {
- restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
- @Override
- public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
- return builder.setConnectTimeout(3000)
- .setSocketTimeout(50000);
- }
- });
- }
-
- /**
- * 新增
- * @return 保存结果
- * @throws Exception exception
- */
- public IndexResponse save(Object o, String index){
- try {
- IndexRequest indexRequest = new IndexRequest(index);
- indexRequest.source(JSON.toJSONString(o), XContentType.JSON);
- indexRequest.id();
- return restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- } catch (Exception e) {
- log.error("插入数据失败",e);
- return null;
- }
- }
-
- /**
- * 批量新增
- *
- * @param list 数据list
- * @return 返回保存结果
- * @throws Exception exception
- */
- public BulkResponse batchSave(List<LogSourceBO> list, String index){
- try {
- log.info("数据发送ES,size:{},index:{}",list.size(),index);
- BulkRequest bulkRequest = new BulkRequest();
- IndexRequest indexRequest;
- for (LogSourceBO item : list) {
- indexRequest = new IndexRequest(index);
- indexRequest.source(JSON.toJSONString(item), XContentType.JSON);
- indexRequest.id();
- bulkRequest.add(indexRequest);
- }
- return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- }catch (Exception e){
- log.error("批量保存失败",e);
- return null;
- }
-
- }
-
- /**
- * 查询符合查询条件的所有数据
- *
- * @return 返回结果list
- * @throws Exception exception
- */
- public List<LogSourceBO> query(Long startTime, Long endTime, String index) {
- List<LogSourceBO> logSourceBOS = new LinkedList<>();
- try {
- SearchRequest request = new SearchRequest();
- request.indices(index);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- QueryBuilder queryBuilder = QueryBuilders.boolQuery()
- .must(QueryBuilders.rangeQuery("timestamp")
- .gte(startTime)
- .lte(endTime));
- searchSourceBuilder.query(queryBuilder);
- request.source(searchSourceBuilder);
- SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- if (res.getHits().getHits() != null) {
- SearchHit[] hits = res.getHits().getHits();
- for (SearchHit hit : hits) {
- Map<String, Object> resultOne = hit.getSourceAsMap();
- logSourceBOS.add(convertMap2Model(resultOne));
- }
- }
- }catch (Exception e){
- log.error("查询失败",e);
- }
- return logSourceBOS;
-
- }
-
-
- private LogSourceBO convertMap2Model(Map<String, Object> map) throws Exception {
- LogSourceBO logSourceBO = new LogSourceBO();
- BeanUtils.populate(logSourceBO, map);
- return logSourceBO;
- }
- }
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.aliyun.gts.tt.ms.service.bo.LogSourceBO;
- import org.apache.commons.beanutils.BeanUtils;
- import org.apache.http.HttpHost;
- import org.apache.http.client.config.RequestConfig;
- import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
- import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
- import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
-
- import java.io.IOException;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
-
- /**
- * @Author: liyue
- * @Date: 2021/12/10/16:29
- * @Description:
- */
- public class Test {
-
- public static final String INDEX = "test_index_2021_12_10";
-
- private static RestHighLevelClient restHighLevelClient;
-
- static {
- HttpHost esHost = new HttpHost("127.0.0.1", 9200);
- RestClientBuilder restClientBuilder = RestClient.builder(esHost);
- setTimeout(restClientBuilder);
- restHighLevelClient = new RestHighLevelClient(restClientBuilder);
- }
-
- public static void main(String[] args) throws Exception {
-
- /* LogSourceBO testDTO = new LogSourceBO(
- System.currentTimeMillis(),
- "g1","u1","d1","k1",1.0
- );
- save(testDTO);
- */
- List<LogSourceBO> g1 = query(1639150440001L, 1639150440001L);
-
- System.out.println(JSONObject.toJSONString(g1));
- restHighLevelClient.close();
- }
-
- /**
- * 查询符合查询条件的所有数据
- * @return 返回结果list
- * @throws Exception exception
- */
- public static List<LogSourceBO> query(Long startTime, Long endTime) throws Exception {
- List<LogSourceBO> logSourceBOS = new LinkedList<>();
- SearchRequest request = new SearchRequest();
- request.indices(INDEX);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- QueryBuilder queryBuilder = QueryBuilders.boolQuery()
- .must(QueryBuilders.rangeQuery("timestamp")
- .gte(startTime)
- .lte(endTime));
- searchSourceBuilder.query(queryBuilder);
- request.source(searchSourceBuilder);
- SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- if (res.getHits().getHits() != null) {
- SearchHit[] hits = res.getHits().getHits();
- for (SearchHit hit : hits) {
- Map<String, Object> resultOne = hit.getSourceAsMap();
- logSourceBOS.add(convertMap2Model(resultOne));
- }
- }
- return logSourceBOS;
- }
-
-
-
- private static LogSourceBO convertMap2Model(Map<String, Object> map) throws Exception {
- LogSourceBO logSourceBO = new LogSourceBO();
- BeanUtils.populate(logSourceBO, map);
- return logSourceBO;
- }
-
- /**
- * 创建索引,新版ES插入数据时自动创建
- *
- * @return 返回索引创建结果
- * @throws IOException exception
- */
- public static CreateIndexResponse createIndex() throws IOException {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX);
- return restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- }
-
- /**
- * 索引是否存在
- *
- * @return true/false
- */
- public static boolean existIndex() {
- GetIndexRequest request = new GetIndexRequest();
- request.indices(INDEX);
- try {
- return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- }
-
-
- /**
- * 新增/修改数据
- *
- * @return 保存结果
- * @throws Exception exception
- */
- public static IndexResponse save(Object o) throws Exception {
- IndexRequest indexRequest = new IndexRequest(INDEX);
- indexRequest.source(JSON.toJSONString(o), XContentType.JSON);
- indexRequest.id();
- return restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- }
-
- /**
- * 设置超时时间
- */
- private static void setTimeout(RestClientBuilder restClientBuilder) {
- restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
- @Override
- public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
- return builder.setConnectTimeout(3000)
- .setSocketTimeout(50000);
- }
- });
- }
- }
-
- import com.alibaba.fastjson.JSON;
- import org.apache.commons.beanutils.BeanUtils;
- import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
- import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
- import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
- import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.action.support.master.AcknowledgedResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.elasticsearch.search.sort.SortOrder;
-
- import java.io.IOException;
- import java.lang.reflect.Method;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- /**
- * @ProjectName: pet_data
- * @Package: com.example.demo.es
- * @ClassName: EsUtils
- * @Author: Devin.W.Zhang
- * @Description: ES 操作工具类
- * @Date: 12/22/2020 10:58 AM
- * @Version: 1.0
- */
- public class EsUtils<T> {
-
- public static final String INDEX = "ly-test-20211210";
-
- private RestHighLevelClient restHighLevelClient;
-
-
- /**
- * 索引是否存在
- *
- * @param indexName 索引名称
- * @return true/false
- */
- public boolean existIndex(String indexName) {
- GetIndexRequest request = new GetIndexRequest();
- request.indices(indexName);
- try {
- return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /**
- * 创建索引,新版ES插入数据时自动创建
- *
- * @param index 索引
- * @return 返回索引创建结果
- * @throws IOException exception
- */
- public CreateIndexResponse createIndex(String index) throws IOException {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
- return restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- }
-
- /**
- * 删除索引
- *
- * @param index 索引
- * @return 返回删除结果
- * @throws IOException exception
- */
- public AcknowledgedResponse deleteIndex(String index) throws IOException {
- DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
- return restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
- }
-
- /**
- * 通过id获取数据
- *
- * @param id 主键
- * @return 返回查询到的结果
- * @throws IOException exception
- */
- public T get(String id, Class<T> classT) throws Exception {
- System.out.println(String.format("get the data from es, id:%s", id));
- GetRequest request = new GetRequest(INDEX, id);
- GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
- Map<String, Object> result = getResponse.getSourceAsMap();
- System.out.println(String.format("the es response:%s", result));
- T t = convertMap2Model(result, classT);
- System.out.println(String.format("change the result to %s: ,the result is :%s", classT, t));
- return t;
- }
-
- /**
- * 新增/修改数据
- *
- * @param t 数据对象
- * @param primaryKey 主键字段名
- * @return 保存结果
- * @throws Exception exception
- */
- public IndexResponse save(T t, String primaryKey) throws Exception {
- IndexRequest indexRequest = new IndexRequest(INDEX);
- indexRequest.source(JSON.toJSONString(t), XContentType.JSON);
- indexRequest.id(getId(t, primaryKey));
- return restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
- }
-
- /**
- * 批量 新增/修改数据
- *
- * @param list 数据list
- * @param primaryKey 主键
- * @return 返回保存结果
- * @throws Exception exception
- */
- public BulkResponse batchSave(List<T> list, String primaryKey) throws Exception {
- BulkRequest bulkRequest = new BulkRequest();
- IndexRequest indexRequest;
- for (T item : list) {
- indexRequest = new IndexRequest(INDEX);
- indexRequest.source(JSON.toJSONString(item), XContentType.JSON);
- indexRequest.id(getId(item, primaryKey));
- bulkRequest.add(indexRequest);
- }
- return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- }
-
- /**
- * 查询符合查询条件的所有数据
- *
- * @param queryBuilder queryBuilder
- * @param orders 排序map
- * @param tClass class
- * @return 返回结果list
- * @throws Exception exception
- */
- public List<T> query(QueryBuilder queryBuilder, Map<String, SortOrder> orders, Class<T> tClass) throws Exception {
- final int currentPage = 1;
- final int pageSize = 100000;
- Object[] obj = pageQuery(currentPage, pageSize, queryBuilder, orders, tClass);
- return (List<T>) obj[1];
- }
-
- /**
- * 分页查询
- * <p>
- * 注意当 (currentPage * pageSize >10000) 该分页查询会报错,
- * 通过postman put方式调用 http://127.0.0.1:9200/_all/_settings?preserve_existing=true 设置 {"index.max_result_window" : "1000000"}
- * 设置成功会返回 {"acknowledged": true}
- * </p>
- *
- * @param currentPage 当前页
- * @param pageSize 每页条数
- * @param queryBuilder queryBuilder
- * @param orders 排序map
- * @param tClass class
- * @return 返回 总条数和数据list
- * @throws Exception exception
- */
- public Object[] pageQuery(int currentPage, int pageSize, QueryBuilder queryBuilder, Map<String, SortOrder> orders, Class<T> tClass) throws Exception {
- Object[] result = new Object[2];
-
- long totalCount;
- List<T> list = new ArrayList<>();
-
- SearchRequest request = new SearchRequest();
- request.indices(INDEX);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.query(queryBuilder);
- request.source(searchSourceBuilder);
-
- //下面的配置为true则如果总条数显示为实际查询到的条数,否则最大只显示10000
- searchSourceBuilder.trackTotalHits(true);
- //起始
- searchSourceBuilder.from((currentPage - 1) * pageSize);
- //默认最大10000
- searchSourceBuilder.size(pageSize);
- //排序
- if (null != orders) {
- for (Map.Entry<String, SortOrder> order : orders.entrySet()) {
- searchSourceBuilder.sort(order.getKey(), order.getValue());
- }
- }
-
- SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- totalCount = res.getHits().getTotalHits().value;
- System.out.println(String.format("totalHits: %s", totalCount));
-
- if (res.getHits().getHits() != null) {
- SearchHit[] hits = res.getHits().getHits();
- for (SearchHit hit : hits) {
- Map<String, Object> resultOne = hit.getSourceAsMap();
- T t = convertMap2Model(resultOne, tClass);
- list.add(t);
- }
- }
- result[0] = totalCount;
- result[1] = list;
- return result;
- }
-
-
- /*下面是一些辅助方法*/
-
- /**
- * 通过反射获取主键的值
- *
- * @param t 对象
- * @param primaryKey 主键字段名称
- * @return 返回主键的值
- * @throws Exception exception
- */
- private String getId(T t, String primaryKey) throws Exception {
- Class<?> aClass = t.getClass();
- String methodName = "get" + (primaryKey.charAt(0) + "").toUpperCase() + primaryKey.substring(1);
- Method method = aClass.getDeclaredMethod(methodName);
- return (String) method.invoke(t);
- }
-
- private T convertMap2Model(Map<String, Object> map, Class<T> classT) throws Exception {
- Object o = classT.newInstance();
- BeanUtils.populate(o, map);
- return (T) o;
- }
-
- /**
- * 关闭连接
- */
- public void close() {
- try {
- if (restHighLevelClient != null) {
- restHighLevelClient.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
扩展
范围查询+精确查询
- SearchRequest request = new SearchRequest();
- request.indices(index);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- QueryBuilder queryBuilder = QueryBuilders.boolQuery()
- .must(QueryBuilders.rangeQuery("time")
- .gte(startTime)
- .lte(endTime))
- .must(QueryBuilders.termQuery("age","18));
- searchSourceBuilder.query(queryBuilder);
- searchSourceBuilder.size(10);
- searchSourceBuilder.from(0);
- request.source(searchSourceBuilder);
- SearchResponse res = restHighLevelClient.search(request, RequestOptions.DEFAULT);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。