赞
踩
建议elastic用7.16.2版本,(即使是用了其他比7.16更高的版本,基本语法是不会改变的,所以降到7.16并不会有非常大的影响)而且插件也只支持到7.16.2。)
链接: https://gitee.com/muyangrenOvo/elastic-boot
<!--elasticsearch7.16.2依赖包--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.16.2</version> </dependency> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> <version>7.16.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.12.3</version> </dependency> <dependency> <groupId>jakarta.json</groupId> <artifactId>jakarta.json-api</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.12.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.12.3</version> </dependency>
#elasticsearch-ip-端口-索引-分词器(如下)-是否需要账号密码
#hanlp: hanlp默认分词
#hanlp_standard: 标准分词
#hanlp_index: 索引分词
#hanlp_n_short: N-最短路分词
#hanlp_dijkstra: 最短路分词
#hanlp_speed: 极速词典分词
elasticsearch:
ip: 192.168.xxx.xxx:9200
index: transfer
analyzer: hanlp_index
isNeedAccount: false
import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.client.RestClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author muyangren * @Auther: muyangren * @Date: 2022/6/30 * @Description: @Configuration 类似 xml-bean * @Version: 1.0 */ @Configuration public class ElasticSearchClientConfig { @Value("${elasticsearch.ip}") private String ip; @Value("${elasticsearch.isNeedAccount}") private boolean isNeedAccount; @Bean public ElasticsearchClient elasticsearchClient() { ElasticsearchClient client; //是否需要账号密码 if (isNeedAccount){ CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "admin@123456")); RestClient restClient = RestClient.builder(this.getElasticSearchHttpHosts()).setHttpClientConfigCallback(httpAsyncClientBuilder -> { httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpAsyncClientBuilder; }).build(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); client = new ElasticsearchClient(transport); }else { // Create the low-level client RestClient restClient = RestClient.builder(this.getElasticSearchHttpHosts()).build(); // Create the transport with a Jackson mapper ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); // And create the API client client = new ElasticsearchClient(transport); } return client; } /** * ElasticSearch 连接地址 * 多个逗号分隔 * 示例:127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203 */ private HttpHost[] getElasticSearchHttpHosts() { String[] hosts = ip.split(","); HttpHost[] httpHosts = new HttpHost[hosts.length]; for (int i = 0; i < httpHosts.length; i++) { String host = hosts[i]; httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1])); } return httpHosts; } }
import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ElasticsearchException; import co.elastic.clients.elasticsearch._types.Result; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch._types.mapping.TextProperty; import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; import co.elastic.clients.elasticsearch.indices.IndexSettings; import co.elastic.clients.transport.endpoints.BooleanResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.modules.talk.entity.TransEsContent; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author guangsheng * @Date: 2022/7/14 * @Description: 重构ES方法,解决代码重复问题 方法大致如此,细节需要自己去打磨 * @Version: 1.0 */ @Component public class ElasticSearchUtils { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class); /** * 判断索引(index)是否存在 */ public static Boolean whetherIndex(ElasticsearchClient esClient,String index){ BooleanResponse getIndexResponse; boolean whetherIndex = false; try { getIndexResponse = esClient.indices().exists(e -> e.index(index)); whetherIndex = getIndexResponse.value(); } catch (IOException e) { logger.info("es服务器超时,【{}】索引查询失败,请检查es服务器是否启动,该功能不影响后续代码执行。",index); } return whetherIndex; } /** * 添加索引 * 分析器主要有两种情况会被使用: * 第一种是插入文档时,将text类型的字段做分词然后插入倒排索引, * 第二种就是在查询时,先对要查询的text类型的输入做分词,再去倒排索引搜索 * analyzer: 分词器 * searchAnalyzer: 查询分词器 */ public static Boolean addIndex(ElasticsearchClient esClient,String index,String analyzer){ boolean whetherTrue =false; try { // 配置索引 Map<String, Property> property = new HashMap<>(1); property.put("onlyOneBest", new Property(new TextProperty.Builder() .analyzer(analyzer) .searchAnalyzer(analyzer) .index(true) .store(true) .build())); TypeMapping typeMapping = new TypeMapping.Builder() .properties(property) .build(); IndexSettings indexSettings = new IndexSettings.Builder() .numberOfShards("5") .build(); CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder() .index(index) .mappings(typeMapping) .settings(indexSettings) .build(); CreateIndexResponse createIndexResponse = esClient.indices().create(createIndexRequest); whetherTrue = Boolean.TRUE.equals(createIndexResponse.acknowledged()); } catch (IOException e) { logger.info("es服务器超时,请检查es服务器是否启动,该功能不影响后续代码执行"); } catch (ElasticsearchException e) { logger.info("同步服务器--es服务器未启用,或者版本错误导致"); } return whetherTrue; } /** * ES添加方法 */ public static void addEsTransfer(ElasticsearchClient esClient, TransEsContent transEsContent , String index) { //存储到ES中 try { CreateResponse createResponse = esClient.create(e -> e .index(index) .id(String.valueOf(transEsContent.getId())) .document(transEsContent)); Result result = createResponse.result(); if (Func.isNotEmpty(result)) { logger.info("【id】:{},es添加数据:{}", transEsContent.getId(), result); } } catch (IOException e) { logger.info("【id】:{}未添加到es服务器,es服务器超时,请检查es服务器是否启动,该功能不影响后续代码执行", transEsContent.getId()); } catch (ElasticsearchException e) { logger.info("添加---es服务器未启用,或者版本错误导致"); } } /** * 同步所有数据 * 批量超过一万条会超时,这里使用分页解决,每次插入五百条 */ public static void addAllEsTransfer(ElasticsearchClient esClient,List<TransEsContent> transList,String index) { BulkRequest.Builder builder = new BulkRequest.Builder(); for (TransEsContent trans : transList) { builder.operations(op -> op .index(idx -> idx .index(index) .id(String.valueOf(trans.getId())) .document(trans))); } BulkResponse result; try { result = esClient.bulk(builder.build()); if (Func.isNotEmpty(result)) { logger.info("数据同步到es服务器,数量为:" + result.took()); } } catch (IOException e) { logger.info("es服务器超时,请检查es服务器是否启动,该功能不影响后续代码执行"); } catch (ElasticsearchException e) { logger.info("同步服务器--es服务器未启用,或者版本错误导致"); } } /** * ES更新方法 */ public static void updateEsTransfer(TransEsContent transEsContent, ElasticsearchClient esClient, String index) { try { //先查看是否存在ES服务中 GetRequest.Builder builder = new GetRequest.Builder(); GetRequest transfer = builder.index(index).id(String.valueOf(transEsContent.getId())).build(); GetResponse<TransEsContent> getResponse = esClient.get(transfer, TransEsContent.class); TransEsContent source = null; if (Func.isNotEmpty(getResponse)) { source = getResponse.source(); } //先根据id查询是否存在该数据 if (Func.isEmpty(source)) { CreateResponse createResponse = esClient.create(e -> e .index(index) .id(String.valueOf(transEsContent.getId())) .document(transEsContent)); Result result = createResponse.result(); if (Func.isNotEmpty(result)) { logger.info("【id】:{},es添加数据:{}", transEsContent.getId(), result); } } else { //更新到es服务器中 UpdateResponse<TransEsContent> updateResponse; updateResponse = esClient.update(e -> e .index(index) .id(String.valueOf(transEsContent.getId())) .doc(transEsContent) , TransEsContent.class); Result result = updateResponse.result(); if (Func.isNotEmpty(result)) { logger.info("【id】:{},es更新数据:{}", transEsContent.getId(), result); } } } catch (IOException e) { logger.info("【id】:{}未更新到es服务器,es服务器超时,请检查es服务器是否启动,该功能不影响后续代码执行", transEsContent.getId()); } catch (ElasticsearchException e) { logger.info("更新---es服务器未启用,或者版本错误导致"); } } /** * 异步删除ES服务器中的数据 */ public static void asyncDeleteEsTrans(List<TransEsContent> list, ElasticsearchClient esClient, String index) { new Thread(() -> { try { ArrayList<BulkOperation> bulkOperations = new ArrayList<>(); for (TransEsContent trans : list) { bulkOperations.add(new BulkOperation.Builder() .delete(d -> d .id(String.valueOf(trans.getId())) .index(index)) .build()); } BulkResponse bulkResponse = esClient.bulk(e -> e.index(index).operations(bulkOperations)); List<BulkResponseItem> items = bulkResponse.items(); if (Func.isNotEmpty(items)) { logger.info("【id】:{},调用ES批量删除返回数据:{}", list.get(0).getId(), items); } } catch (IOException e) { logger.info("es服务器超时,【id】:{}未同步到es服务器,请检查es服务器是否启动,该功能不影响后续代码执行。", list.get(0).getId()); } catch (ElasticsearchException e) { logger.info("删除---es服务器未启用,或者版本错误导致"); } }).start(); } }
@Value("${elasticsearch.analyzer}") private String analyzer; @Value("${elasticsearch.index}") private String index; @Autowired private ElasticsearchClient esClient; /** * 初始化数据 * @return */ @Override public Boolean initEsData() { //1、查询索引是否存在 Boolean whetherTrue = ElasticSearchUtils.whetherIndex(esClient, index); if (!whetherTrue) { //2、不存在先添加索引在同步数据 whetherTrue = ElasticSearchUtils.addIndex(esClient, index, analyzer); } //3、获取需要添加的总数(select count(1) from table where ... ) Integer transCount =500; //4、用框架自带的分页方法 Query query = new Query(); //4.1、每次查询500 query.setSize(500); long pages = transCount / query.getSize(); if (transCount % query.getSize() != 0L) { ++pages; } for (int i = 1; i <= pages; i++) { query.setCurrent(i); List<xxxx> transList = baseMapper.selectPage(Condition.getPage(query)); if (Func.isNotEmpty(transList)) { ElasticSearchUtils.addAllEsTransfer(esClient, esTransList, index); } } return whetherTrue; } /** * 分页(细节方面需要自己打磨,这里只是展示如何使用到hanlp) * @param transEsSearchVo * @return */ @Override public EsTransMap selectEsTransPage(TransEsSearchVo transEsSearchVo) { StopWatch sw = new StopWatch(); sw.start(); int current; if (transEsSearchVo.getPageNo() < 1) { transEsSearchVo.setPageNo(1); } if (transEsSearchVo.getPageSize() < 1) { transEsSearchVo.setPageSize(10); } if (transEsSearchVo.getPageNo() == 1) { current = transEsSearchVo.getPageNo() - 1; } else { current = (transEsSearchVo.getPageNo() - 1) * transEsSearchVo.getPageSize(); } //1、过滤关键字的特殊符号 String regEx="[\n`~!@#$%^&*()+=|{}':;',\\[\\].<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。, 、?]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(transEsSearchVo.getKeyword()); transEsSearchVo.setKeyword(m.replaceAll("").trim()); //2、构造过滤请求 SearchRequest.Builder builder = new SearchRequest.Builder(); SearchRequest specificContent; specificContent = builder .index(index) .query(q -> q .bool(b -> { //3、查询关键字(对应字段) b.must( t-> t.match(v -> v.field("onlyOneBest").query(FieldValue.of(transEsSearchVo.getKeyword())))); return b; }) ) //4、命中关键字高亮 .highlight(h ->h.fields("onlyOneBest",f->f.preTags("<font color='red'>").postTags("</font>"))) //5、排序 .sort(sort -> sort.field(f->f.field(EsConstant.SORT_FILED_TALK_CREATE_TIME).order(SortOrder.Desc))) //从第from开始每次查询size个(第一条数据下标为0) .from(current) .size(transEsSearchVo.getPageSize()) .build(); SearchResponse<TransEsContent> searchResponse; EsTransMap esTransMap = new EsTransMap(); try { searchResponse = esClient.search(specificContent, TransEsContent.class); List<TransEsContent> transEsList = new ArrayList<>(); //6、获取高亮部分 List<Hit<TransEsContent>> hits = searchResponse.hits().hits(); if (Func.isNotEmpty(hits)) { hits.forEach( h -> { assert h.source() != null; //7、处理高亮部分 transEsList.add(handleContentAndTitle(h.source(),h.highlight())); }); } sw.stop(); esTransMap.setRecords(transEsList); esTransMap.setQueryTime(sw.getTotalTimeMillis()); esTransMap.setTotal(searchResponse.hits().total().value()); } catch (IOException e) { throw new ServiceException("ES服务器未启用,无法使用搜索功能"); } return esTransMap; }
以上就是部分代码,如果想更好看实现效果、可以拉下代码到本地跑。
没有部署es服务器的可以参考下该篇文章 链接: docker-compose部署elasticsearch+hanlp分词器
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。