赞
踩
1、导入依赖
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>7.10.2</version>
- </dependency>
2、配置application.yml
es集群配置
- es:
- uris: http://XXX.XXX.XXX.XXX:9200,http://XXX.XXX.XXX.XXX:9200,http://XXX.XXX.XXX.XXX:9200
- connectTimeout: 10000
- soTimeout: 5000
第二种
- es:
- url: xxx.xxx.xxx.xxx;xxx.xxx.xxx.xxx;xxx.xxx.xxx.xxx
- port: 9200
- connectTimeout: 10000
- soTimeout: 5000
单个es配置
- es:
- url: xxx.xxx.xxx.xxx
- port: 9200
- connectTimeout: 10000
- soTimeout: 5000
3、初始化Es配置
es集群初始化配置
-
- import lombok.Getter;
- import lombok.Setter;
- import org.apache.http.HttpHost;
- import org.opensearch.client.RestClient;
- import org.opensearch.client.RestClientBuilder;
- import org.opensearch.client.RestHighLevelClient;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- import java.util.Arrays;
-
- @Component
- public class ElasticsearchConfig implements InitializingBean {
-
- @Setter
- @Getter
- @Value("${els.event.uris:}")
- private String[] uris;
-
- @Setter
- @Getter
- @Value("${els.event.connectTimeout:1000}")
- private Integer connectTimeout;
-
- @Setter
- @Getter
- @Value("${els.event.soTimeout:1000}")
- private Integer soTimeout;
-
-
- /**
- * After properties set.
- *
- */
- @Override
- public void afterPropertiesSet() {
- }
-
- /**
- * To string string.
- *
- * @return the string
- */
- @Override
- public String toString() {
- return "ElasticsearchConfig{" + "uris='" + Arrays.toString(uris) + '\'' + ", port='" + port + '\'' + '}';
- }
-
- @Bean(destroyMethod = "close", name = "client")
- public RestHighLevelClient initRestClient() {
- HttpHost[] httpHosts = Arrays.stream(uris).map(HttpHost::create).toArray(HttpHost[]::new);
- RestClientBuilder builder = RestClient.builder(httpHosts)
- .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
- .setConnectTimeout(connectTimeout)
- .setSocketTimeout(soTimeout)
- );
- return new RestHighLevelClient(builder);
- }
- }
第二种,自己拼接
-
- import lombok.Getter;
- import lombok.Setter;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- import java.util.Arrays;
-
- @Component
- public class ElasticsearchConfig implements InitializingBean {
-
- @Setter
- @Getter
- @Value("${es.url:}")
- private String url;
-
- @Setter
- @Getter
- @Value("${es.schema:http}")
- private String schema;
-
- @Setter
- @Getter
- @Value("${es.port:9200}")
- private Integer port;
-
- @Setter
- @Getter
- @Value("${es.connectTimeout:1000}")
- private Integer connectTimeout;
-
- @Setter
- @Getter
- @Value("${es.soTimeout:1000}")
- private Integer soTimeout;
-
-
- /**
- * After properties set.
- *
- */
- @Override
- public void afterPropertiesSet() {
- }
-
- /**
- * To string string.
- *
- * @return the string
- */
- @Override
- public String toString() {
- return "ElasticsearchConfig{" + "url='" + url + '\'' + ", port='" + port + '\''
- + '}';
- }
-
- @Bean(destroyMethod = "close", name = "client")
- public RestHighLevelClient initRestClient() {
- String[] array = url.split(SeparatorConstants.SEMI_COLON);
- HttpHost[] hosts = new HttpHost[array.length];
- for (int i = 0; i < array.length; i++) {
- hosts[i] = new HttpHost(array[i], port, schema);
- }
- RestClientBuilder builder = RestClient.builder(hosts)
- .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
- .setConnectTimeout(connectTimeout)
- .setSocketTimeout(soTimeout)
- );
- return new RestHighLevelClient(builder);
- }
- }
单个es初始化配置
-
- import lombok.Getter;
- import lombok.Setter;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- import java.util.Arrays;
-
- @Component
- public class ElasticsearchConfig implements InitializingBean {
-
- @Setter
- @Getter
- @Value("${es.url:}")
- private String url;
-
- @Setter
- @Getter
- @Value("${es.schema:http}")
- private String schema;
-
- @Setter
- @Getter
- @Value("${es.port:9200}")
- private Integer port;
-
- @Setter
- @Getter
- @Value("${es.connectTimeout:1000}")
- private Integer connectTimeout;
-
- @Setter
- @Getter
- @Value("${es.soTimeout:1000}")
- private Integer soTimeout;
-
-
- /**
- * After properties set.
- *
- */
- @Override
- public void afterPropertiesSet() {
- }
-
- /**
- * To string string.
- *
- * @return the string
- */
- @Override
- public String toString() {
- return "ElasticsearchConfig{" + "url='" + url + '\'' + ", port='" + port + '\''
- + '}';
- }
-
- @Bean(destroyMethod = "close", name = "client")
- public RestHighLevelClient initRestClient() {
- RestClientBuilder builder = RestClient.builder(new HttpHost(url, port, schema))
- .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
- .setConnectTimeout(connectTimeout)
- .setSocketTimeout(soTimeout)
- );
- return new RestHighLevelClient(builder);
- }
- }
4、建立实体类
5、增删改方法
推荐学习示例: SpringBoot-Learn/elasticsearch at master · wupeixuan/SpringBoot-Learn · GitHub
注意坑点:查询方法默认查询10条,不管分不分页查询,因此,注意给size够用的大小
searchSourceBuilder.size(userIds.size());
-
- import com.alibaba.fastjson.JSON;
- import org.apache.commons.collections.CollectionUtils;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.query.BoolQueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
-
- @Service
- public class EsServiceImpl implements EsService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(EsService.class);
-
- @Autowired
- private RestHighLevelClient client;
-
- private final String INDEX = "index"; // 索引名称
-
- /**
- * 单条添加
- * @param document
- * @return {@link Boolean}
- */
- @Override
- public Boolean addDocument(UserDocument document) {
- LOGGER.info("Add Document begin");
- // .id 给每个Document生成id 可以根据id搜素和删除,不用es自动生成的id
- IndexRequest indexRequest = new IndexRequest(INDEX).id(document.getId()).source(JSON.toJSONString(document), XContentType.JSON);
- try {
- IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
- LOGGER.info("Add Document end");
- return response.status().equals(RestStatus.OK);
- } catch (IOException exception) {
- LOGGER.error("Add Document Exception: ", exception);
- }
- return false;
- }
-
- /**
- * 批量添加
- * @param list
- * @return {@link Boolean}
- */
- @Override
- public Boolean bulkDocument(List<UserDocument> list) {
- LOGGER.info("Bulk Document begin");
- BulkRequest bulkRequest = new BulkRequest();
- try {
- if (CollectionUtils.isNotEmpty(list)) {
- for (UserDocument document: list) {
- IndexRequest indexRequest = new IndexRequest(INDEX).id(document.getId()).source(JSON.toJSONString(document), XContentType.JSON);
- bulkRequest.add(indexRequest);
- }
- BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
- LOGGER.info("Bulk Document end");
- return response.status().equals(RestStatus.OK);
- }
- } catch (IOException exception) {
- LOGGER.error("Bulk Document Exception: ", exception);
- }
- return false;
- }
-
-
- /**
- * 单条更新
- * @param document
- * @return {@link Boolean}
- */
- @Override
- public Boolean update(UserDocument document) {
- LOGGER.info("Bulk Update begin");
- try {
- UpdateRequest updateRequest = new UpdateRequest(INDEX, document.getId()).doc(JSON.toJSONString(document), XContentType.JSON);
- UpdateResponse update = client.update(updateRequest, RequestOptions.DEFAULT);
- LOGGER.info("Bulk Update end");
- return update.status().equals(RestStatus.OK);
- } catch (IOException exception) {
- LOGGER.error("Bulk Update Exception: ", exception);
- }
- return false;
- }
-
- /**
- * 批量更新
- * @param list
- * @return {@link Boolean}
- */
- @Override
- public Boolean bulkUpdate(List<UserDocument> list) {
- LOGGER.info("Bulk Update begin");
- BulkRequest bulkRequest = new BulkRequest();
- try {
- if (CollectionUtils.isNotEmpty(list)) {
- for (UserDocument document : list) {
- UpdateRequest updateRequest = new UpdateRequest(INDEX, document.getId()).doc(JSON.toJSONString(document), XContentType.JSON);
- bulkRequest.add(updateRequest);
- }
- BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
- LOGGER.info("Bulk Update end");
- return response.status().equals(RestStatus.OK);
- }
- } catch (IOException exception) {
- LOGGER.error("Bulk Update Exception: ", exception);
- }
- return false;
- }
-
-
- /**
- * 根据某条件批量删除
- * @param userIds
- */
- @Override
- public void deleteDocumentByUserIds(List<Long> userIds) {
- LOGGER.info("Bulk Delete begin");
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices(INDEX);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- boolQueryBuilder.must(QueryBuilders.termsQuery("userId", userIds));
- searchSourceBuilder.size(userIds.size());
- searchSourceBuilder.query(boolQueryBuilder);
- searchRequest.source(searchSourceBuilder);
- SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
- SearchHit[] searchHit = search.getHits().getHits();
- List<UserDocument> list = new ArrayList<>();
- if (searchHit.length > 0) {
- for (SearchHit hit : searchHit) {
- list.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
- }
- }
- BulkRequest bulkRequest = new BulkRequest();
- try {
- if (CollectionUtils.isNotEmpty(list)) {
- for (UserDocument document : list) {
- DeleteRequest deleteRequest = new DeleteRequest(INDEX, document.getId());
- bulkRequest.add(deleteRequest);
- }
- client.bulk(bulkRequest, RequestOptions.DEFAULT);
- }
- LOGGER.info("Bulk Delete end");
- } catch (IOException exception) {
- LOGGER.error("Bulk Delete Exception: ", exception);
- }
- }
- }
-
6、查询方法
普通查询--根据某条件查询
- @Override
- public List<UserDocument> getUserDocuments(List<Long> userIds) throws IOException {
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices(INDEX);
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- boolQueryBuilder.must(QueryBuilders.termsQuery("userId", userIds));
- searchSourceBuilder.size(userIds.size());
- searchSourceBuilder.query(boolQueryBuilder);
- searchRequest.source(searchSourceBuilder);
- SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
- SearchHit[] searchHit = search.getHits().getHits();
- List<UserDocument> list = new ArrayList<>();
- if (searchHit.length > 0) {
- for (SearchHit hit : searchHit) {
- list.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
- }
- }
- return list;
- }
分页查询
如若不知道怎么写,可用sql转es
- public BaseModelPageDTO<RestEventEs> baseSearch(EventPageSearchQuery query) {
- BaseModelPageDTO<RestEventEs> basePageDTO = new BaseModelPageDTO<>();
- try {
- LOGGER.info("search begin");
- LOGGER.info("index = {}", INDEX);
- // 1.创建 SearchRequest搜索请求
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices(INDEX);
-
- // 2.创建 SearchSourceBuilder条件构造。
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
-
- // 查询条件
- BoolQueryBuilder rootFilter = new BoolQueryBuilder();
- BoolQueryBuilder filterQuery = QueryBuilders.boolQuery();
- filterQuery.must(QueryBuilders.matchPhraseQuery("status", true));
- BoolQueryBuilder boolQueryBuilder1 = QueryBuilders.boolQuery();
- // wildcardQuery 使用模糊查询要加.keyword才能查询出来
- boolQueryBuilder1.should(QueryBuilders.wildcardQuery("username.keyword", "niki"));
- boolQueryBuilder1.should(QueryBuilders.matchQuery("phone", "1231234"));
- filterQuery.must(boolQueryBuilder1);
- BoolQueryBuilder mineOrAuthedQuery = new BoolQueryBuilder();
- filterQuery.must(QueryBuilders.termsQuery("appId", "wechat"));
- mineOrAuthedQuery.should(filterQuery);
-
- // 3.将 SearchSourceBuilder 添加到 SearchRequest中
- boolQueryBuilder.filter(rootFilter);
- searchSourceBuilder.query(rootFilter);
-
- // 折叠重复的数据
- searchSourceBuilder.collapse(new CollapseBuilder("id"));
- AggregationBuilder aggregation = AggregationBuilders.cardinality("total_size").field("id");
- searchSourceBuilder.aggregation(aggregation);
-
- // 设置分页
- searchSourceBuilder.from((query.getPageNum() - 1) * query.getPageSize());
- searchSourceBuilder.size(query.getPageSize());
-
- // 排序
- searchSourceBuilder.sort("createDate", SortOrder.ASC);
- searchSourceBuilder.sort("updateDate", SortOrder.DESC);
- searchRequest.source(searchSourceBuilder);
-
- // 4.执行查询
- SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
-
- Aggregations aggregations = searchResponse.getAggregations();
- Cardinality cardinality = aggregations.get("total_size");
- List<RestEventEs> restEventEs = this.getRestEventEs(searchResponse);
-
- setBaseSearchPageDto(basePageDTO, restEventEs, cardinality, query);
- LOGGER.info("search end");
- return basePageDTO;
- } catch (IOException exception) {
- LOGGER.error("search error", exception);
- }
- return basePageDTO;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。