赞
踩
记录一下springBoot集成Elasticsearch的基本使用。
目录
RestHighLevelClient是官方指定的连接API, 在springBoot项目中使用RestHighLevelClient来连接es并实现增删改查。
- <!-- 其他依赖这里省略 -->
- <properties>
- <elasticsearch.version>7.4.2</elasticsearch.version>
- </properties>
- <dependencies>
- <!-- ElasticSearch 相关依赖 start-->
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependencies>
- ### ES连接信息配置 ###
- # 单机版ip
- elasticsearch.port=127.0.0.1
- # 单机版端口号
- elasticsearch.host=9200
- # 是否开启集群模式
- opencluster=false
- #集群模式各节点IP和端口号
- elasticsearch.hostAndPorts=127.0.0.1:9201;127.0.0.1:9202;127.0.0.1:9203
配置类中对RestHighLevelClient对象进行初始化并注入到容器
- package com.zhh.demo.commons.elasticsearch.config;
-
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * ES基础配置
- * @author zhaoheng
- * @date 2021/05/24
- */
- @Configuration
- public class ElasticSearchConfig {
- protected static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);
- /** 单机版端口号 */
- @Value("${elasticsearch.host}")
- public String host;
-
- /** 单机版ip */
- @Value("${elasticsearch.port}")
- public int port;
-
- /** 是否开启集群模式 */
- @Value("${opencluster}")
- public boolean openCluster;
-
- /** 集群各节点的IP和端口号 */
- @Value("${elasticsearch.hostAndPorts}")
- private String hostAndPorts;
-
- /**
- * ES客户端配置并注入到容器
- */
- @Bean
- public RestHighLevelClient restHighLevelClient() {
- RestHighLevelClient client = null;
- if (openCluster) {
- logger.info("开始初始化集群模式RestHighLevelClient连接...");
- final HttpHost[] httpHosts = getHttpHosts();
- if (httpHosts != null && httpHosts.length > 0) {
- logger.info("获取到的配置文件为空");
- return client;
- }
- try {
- client = new RestHighLevelClient(RestClient.builder(httpHosts));
- } catch (Exception ex) {
- logger.error("初始化ES连接失败,失败信息:{}", ex.getMessage());
- }
- } else {
- final HttpHost singleNode = new HttpHost(host, port, "http");
- logger.info("开始初始化非集群模式RestHighLevelClient连接...");
- client = new RestHighLevelClient(
- RestClient.builder(singleNode));
- }
- return client;
- }
-
- /**
- * 获取配置文件中的es服务器地址信息
- * @return
- */
- private HttpHost[] getHttpHosts() {
- HttpHost[] httpHosts = null;
- if (hostAndPorts != null && !"".equals(hostAndPorts)) {
- final String[] hostAndPort = hostAndPorts.split(";");
- if (httpHosts != null && hostAndPort.length > 0) {
- httpHosts = new HttpHost[hostAndPort.length];
- for (int i = 0; i < hostAndPort.length; i++) {
- String hostAndPortItem = hostAndPort[i];
- final String[] singleHostAndPort = hostAndPortItem.split(":");
- final String hostIP = singleHostAndPort[0];
- final String port = singleHostAndPort[1];
- logger.info("获取到的主机是:host:{},port:{}", hostIP, port);
- final HttpHost nodes = new HttpHost(hostIP, Integer.parseInt(port), "http");
- httpHosts[i] = nodes;
- }
- }
- }
- return httpHosts;
- }
- }
后续直接调用即可
- package com.zhh.demo.commons.elasticsearch;
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.zhh.demo.commons.utils.JSONDataUtil;
- import com.zhh.demo.commons.utils.ToolUtil;
- import com.zhh.demo.domain.ProductESDTO;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.delete.DeleteResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.indices.CreateIndexRequest;
- import org.elasticsearch.client.indices.CreateIndexResponse;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.index.reindex.BulkByScrollResponse;
- import org.elasticsearch.index.reindex.DeleteByQueryRequest;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.SearchHits;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- /**
- * 商品ES搜索相关
- * @author zhaoheng
- * @date 2021/05/24
- */
- @Component
- public class ProductServiceEs {
- private static final Logger logger = LoggerFactory.getLogger(ProductServiceEs.class);
-
- /** 索引名称 */
- public static final String MY_INDEX ="my_index";
-
- @Autowired
- private RestHighLevelClient restHighLevelClient;
-
- /**
- * 创建索引并添加mapping字段映射
- * @throws Exception
- */
- public boolean createIndex()throws Exception{
- logger.info("创建ES索引开始");
- CreateIndexRequest request = new CreateIndexRequest(MY_INDEX);
- request.mapping(this.mappingBuilder());
- // 设置分片和分片副本
- request.settings(Settings.builder()
- .put("index.number_of_shards",5)
- .put("index.number_of_replicas",1));
- CreateIndexResponse response = restHighLevelClient.indices().create(request,RequestOptions.DEFAULT);
- logger.info("创建ES索引结束,结果:{}",response.isAcknowledged());
- return response.isAcknowledged();
- }
-
- /**
- * es的字段mapping设置
- * 设置es索引字段属性,是否分词,使用的什么分词器
- * @return
- * @throws Exception
- */
- private XContentBuilder mappingBuilder() throws Exception{
- XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().prettyPrint()
- .startObject()
- .startObject("properties")
-
- // 商品id
- .startObject("productId")
- // 字段类型text类型,不需要建立分词索引
- .field("type", "keyword")
- .endObject()
-
- // 商品名称
- .startObject("productName")
- // 字段类型text类型,商品名称需要根据关键字搜索,使用需要设置成text类型,需要建立索引分词
- .field("type", "text")
- /**
- * index定义字段的分析类型以及检索方式,控制字段值是否被索引.他可以设置成 true 或者 false。
- * 没有被索引的字段将无法搜索,默认都是true
- * */
- .field("index", "true")
- // 使用ik分词器
- .field("analyzer","ik_max_word")
- .endObject()
-
- // 搜索提示、自动补全,参考es的Completion Suggester api
- .startObject("titleList")
- .field("type", "completion")
- .endObject()
-
- .endObject()
- .endObject();
- return xContentBuilder;
- }
-
- /**
- * 判断索引是否存在
- * @return
- * @throws Exception
- */
- public Boolean isExistsIndex() throws Exception{
- GetIndexRequest request = new GetIndexRequest(MY_INDEX);
- return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
- }
-
- /**
- * id相同,数据存在就覆盖,不存在就添加
- * @throws IOException
- */
- public boolean save(ProductESDTO product) throws Exception {
- logger.info("ES添加数据,data:{}", JSONDataUtil.toJSONStr(product));
- IndexRequest request = new IndexRequest(MY_INDEX);
- request.id(product.getProductId());
- request.source(JSONDataUtil.toJSONStr(product), XContentType.JSON);
- // 添加数据索引
- IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
- logger.info(indexResponse.toString());
- return RestStatus.CREATED.equals(indexResponse.status());
- }
-
- /**
- * 批量添加数据
- * @param list
- * @throws Exception
- */
- public boolean bulkSave(List<ProductESDTO> list) throws Exception {
- logger.info("ES-批量添加数据start");
- IndexRequest request = null;
- BulkRequest bulkRequest = new BulkRequest();
- for (ProductESDTO product : list) {
- request = new IndexRequest(MY_INDEX);
- request.id(String.valueOf(product.getProductId()));
- request.source(JSONDataUtil.toJSONStr(product), XContentType.JSON);
- bulkRequest.add(request);
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
- logger.info("ES-批量添加数据end");
- return !bulkResponse.hasFailures();
- }
- /**
- * 批量删除数据
- * @param productIds
- * @throws Exception
- */
- public boolean bulkDelete(List<String> productIds) throws Exception {
- logger.info("批量删除数据start");
- DeleteRequest request = null;
- BulkRequest bulkRequest = new BulkRequest();
- for (String product : productIds) {
- request = new DeleteRequest(MY_INDEX);
- request.id(product);
- bulkRequest.add(request);
- }
- BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
- logger.info("批量删除数据end");
- return !bulkResponse.hasFailures();
- }
-
- /**
- * 根据id删除数据
- * @param productId 商品id
- * @throws Exception
- */
- public boolean deleteById(String productId) throws Exception{
- DeleteRequest deleteRequest = new DeleteRequest(MY_INDEX);
- deleteRequest.id(productId);
- DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
- return RestStatus.OK.equals(deleteResponse.status());
- }
-
- /**
- * 删除全部数据
- * @throws Exception
- */
- public boolean deleteAllData() throws Exception{
- logger.info("删除索引{}的全部数据",MY_INDEX);
- DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(MY_INDEX);
- // 删除所有匹配上的数据,查询全部,即删除全部
- deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery());
- BulkByScrollResponse deleteResponse = restHighLevelClient.deleteByQuery(deleteByQueryRequest,RequestOptions.DEFAULT);
- logger.info("删除索引{}的全部数据,删除条数:{}",MY_INDEX,deleteResponse.getTotal());
- return RestStatus.OK.equals(deleteResponse.getStatus());
- }
-
- /**
- * 搜索并且获取结果
- *
- * @param searchSourceBuilder
- * @param indexName
- * @return
- */
- private SearchResponse searchAndGetResult(SearchSourceBuilder searchSourceBuilder, String indexName) throws IOException {
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices(indexName);
- searchRequest.source(searchSourceBuilder);
- SearchResponse response = null;
- logger.info("ES查询的DSL语句为:{}", searchSourceBuilder.toString());
- try {
- response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
- } catch (Exception ex) {
- logger.error("ES查询时出错,出错信息为:{}", ex.getMessage());
- throw ex;
- }
- if (ToolUtil.isNotEmpty(response)) {
- logger.debug("ES查询返回的数据为:{}", response.toString());
- }
- return response;
- }
-
- /**
- * 获取处理过的查询数据
- *
- * @param response
- * @return
- */
- private static Map<String, Object> getJsonArray(SearchResponse response) {
- final JSONArray jsonArray = new JSONArray();
- final Map<String, Object> resultMap = new HashMap<>();
- if (ToolUtil.isEmpty(response)) {
- return resultMap;
- }
- final SearchHits hits = response.getHits();
- Long count = 0L;
- List<String> ids = new ArrayList<>();
- if (ToolUtil.isNotEmpty(hits)) {
- count = hits.getTotalHits().value;
- final SearchHit[] hitsArray = hits.getHits();
- if (ToolUtil.isNotEmpty(hitsArray)) {
- for (SearchHit hit : hitsArray) {
- final Map<String, Object> sourceAsMap = hit.getSourceAsMap();
- final String id = hit.getId();
- final JSONObject jsonObject = new JSONObject();
- for (Map.Entry<String, Object> entry : sourceAsMap.entrySet()) {
- jsonObject.put(entry.getKey(), entry.getValue());
- }
- jsonArray.add(jsonObject);
- ids.add(id);
- }
- }
- }
- // 满足条件的数据总条数
- resultMap.put("total", count);
- // 查询得到的数据
- resultMap.put("rows", jsonArray);
- // 查询到的数据的id集合
- resultMap.put("ids", ids);
- return resultMap;
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。