当前位置:   article > 正文

ElasticSearch集成SpringBoot实践及数据同步_springboot mysql rocketmq elasticsearchrepository

springboot mysql rocketmq elasticsearchrepository 数据同步

一  前言

ES 全称 Elasticsearch 是一款分布式的全文搜索引擎,在互联网公司中,这款搜索引擎一直被程序员们所推崇。常见的使用场景如ELK日志分析,电商APP的商品推荐,社交APP的同城用户推荐等等。今天结合自己平时的一些学习对它与SpringBoot的基础集成以及一些实际项目中的使用做点小总结。

二  ElasticSearch详细介绍

ElasticSearch是一款非常强大的、基于Lucene的开源搜索及分析引擎;它是一个实时的分布式搜索分析引擎,它能让你以前所未有的速度和规模,去探索你的数据。

在当前软件行业中,搜索是一个软件系统或平台的基本功能, 学习ElasticSearch就可以为相应的软件打造出良好的搜索体验。其次,ElasticSearch具备非常强的大数据分析能力。虽然Hadoop也可以做大数据分析,但是有时候分析数据可能出现等待时间很长的情况。而ElasticSearch的分析能力更为突出,具备Hadoop不具备的能力。在当今大数据时代,掌握近实时的搜索和分析能力,才能掌握核心竞争力,洞见未来。

ElasticSearch的主要功能及应用场景

 

  • 主要功能:

1)海量数据的分布式存储以及集群管理,达到了服务与数据的高可用以及水平扩展;

2)近实时搜索,性能卓越。对结构化、全文、地理位置等类型数据的处理;

3)海量数据的近实时分析(聚合功能)

  • 应用场景:

1)网站搜索、垂直搜索、代码搜索;

2)日志管理与分析、安全指标监控、应用性能监控、Web抓取舆情分析;

ElasticSearch基础概念

  • Near Realtime(NRT) 近实时。数据提交索引后,立马就可以搜索到。

  • Cluster 集群,一个集群由一个唯一的名字标识,默认为“elasticsearch”。集群名称非常重要,具有相同集群名的节点才会组成一个集群。集群名称可以在配置文件中指定。

  • Node 节点:存储集群的数据,参与集群的索引和搜索功能。像集群有名字,节点也有自己的名称,默认在启动时会以一个随机的UUID的前七个字符作为节点的名字,你可以为其指定任意的名字。通过集群名在网络中发现同伴组成集群。一个节点也可是集群。

  • Index 索引: 一个索引是一个文档的集合(等同于solr中的集合)。每个索引有唯一的名字,通过这个名字来操作它。一个集群中可以有任意多个索引。

  • Type 类型:指在一个索引中,可以索引不同类型的文档,如用户数据、博客数据。从6.0.0 版本起已废弃,一个索引中只存放一类数据。

  • Document 文档:被索引的一条数据,索引的基本信息单元,以JSON格式来表示。

  • Shard 分片:当索引上的数据量太大的时候,我们通常会将一个索引上的数据进行水平拆分,拆分出来的每个数据库叫作一个分片。在一个多分片的索引中写入数据时,通过路由来确定具体写入那一个分片中,所以在创建索引时需要指定分片的数量,并且分片的数量一旦确定就不能更改。分片后的索引带来了规模上(数据水平切分)和性能上(并行执行)的提升。每个分片都是Luence中的一个索引文件,每个分片必须有一个主分片和零到多个副本分片。

  • Replication 备份:是指对主分片的备份, 一个分片可以有多个备份(副本)。主分片和备份分片都可以对外提供查询服务,写操作时先在主分片上完成,然后分发到备份上。当主分片不可用时,会在备份的分片中选举出一个作为主分片,所以备份不仅可以提升系统的高可用性能,还可以提升搜索时的并发性能。但是若副本太多的话,在写操作时会增加数据同步的负担。

  • Settings:对集群中索引的定义,比如一个索引默认的分片数、副本数等信息。

  • Mapping:类似于关系型数据库中的表结构信息,用于定义索引中字段(Field)的存储类型、分词方式、是否存储等信息。Elasticsearch中的mapping是可以动态识别的。如果没有特殊需求,则不需要手动创建mapping,因为Elasticsearch会自动根据数据格式识别它的类型,但是当需要对某些字段添加特殊属性(比如:定义使用其他分词器、是否分词、是否存储等)时,就需要手动设置mapping了。一个索引的mapping一旦创建,若已经存储了数据,就不可修改了。

  • Analyzer:字段的分词方式的定义。一个analyzer通常由一个tokenizer、零到多个filter组成。比如默认的标准Analyzer包含一个标准的tokenizer和三个filter:Standard Token Filter、Lower Case Token Filter、Stop Token Filter。

Elasticsearch的节点的分类

  • 主节点(Master Node):也叫作主节点,主节点负责创建索引、删除索引、分配分片、追踪集群中的节点状态等工作。Elasticsearch中的主节点的工作量相对较轻。用户的请求可以发往任何一个节点,并由该节点负责分发请求、收集结果等操作,而并不需要经过主节点转发。通过在配置文件中设置node.master=true来设置该节点成为候选主节点(但该节点不一定是主节点,主节点是集群在候选节点中选举出来的),在Elasticsearch集群中只有候选节点才有选举权和被选举权。其他节点是不参与选举工作的。

  • 数据节点(Data Node):数据节点,负责数据的存储和相关具体操作,比如索引数据的创建、修改、删除、搜索、聚合。所以,数据节点对机器配置要求比较高,首先需要有足够的磁盘空间来存储数据,其次数据操作对系统CPU、Memory和I/O的性能消耗都很大。通常随着集群的扩大,需要增加更多的数据节点来提高可用性。通过在配置文件中设置node.data=true来设置该节点成为数据节点。

  • 客户端节点(Client Node):就是既不做候选主节点也不做数据节点的节点,只负责请求的分发、汇总等,也就是下面要说到的协调节点的角色。其实任何一个节点都可以完成这样的工作,单独增加这样的节点更多地是为了提高并发性。可在配置文件中设置该节点成为数据节点:

  1.  node.master=false
  2.  node.data=false
  • 部落节点(Tribe Node):部落节点可以跨越多个集群,它可以接收每个集群的状态,然后合并成一个全局集群的状态,它可以读写所有集群节点上的数据,在配置文件中通过如下设置使节点成为部落节点:

  1.  tribe:
  2.    one:
  3.      cluster.name: cluster_one
  4.    two:
  5.      cluster.name: cluster_two

 因为Tribe Node要在Elasticsearch 7.0以后移除,所以不建议使用。

  • 协调节点(Coordinating Node):协调节点,是一种角色,而不是真实的Elasticsearch的节点,我们没有办法通过配置项来配置哪个节点为协调节点。集群中的任何节点都可以充当协调节点的角色。当一个节点A收到用户的查询请求后,会把查询语句分发到其他的节点,然后合并各个节点返回的查询结果,最好返回一个完整的数据集给用户。在这个过程中,节点A扮演的就是协调节点的角色。由此可见,协调节点会对CPU、Memory和I/O要求比较高。 集群的状态有Green、Yellow和Red三种,如下所述

  • Green:绿色,健康。所有的主分片和副本分片都可正常工作,集群100%健康。

  • Yellow:黄色,预警。所有的主分片都可以正常工作,但至少有一个副本分片是不能正常工作的。此时集群可以正常工作,但是集群的高可用性在某种程度上被弱化。

  • Red:红色,集群不可正常使用。集群中至少有一个分片的主分片及它的全部副本分片都不可正常工作。这时虽然集群的查询操作还可以进行,但是也只能返回部分数据(其他正常分片的数据可以返回),而分配到这个分片上的写入请求将会报错,最终会导致数据的丢失。

ES和数据库的对比

EDBMSElasticSearch
数据库(database)索引(index)
表(table)类型(type)(6.0.0废弃)
行(row)文档(document)
列(column)字段(field)
表结构(schema)映射(mapping)
索引倒排索引
SQL查询DSL
查询GET ...
更新PUT ...
删除DELETE ...

三  基础实践

因为ES将数据以json格式存储在索引库中,为了方便,实践阶段就不专门建立数据库了,数据直接用ES进行存储(实际不适合数据存储)。基本流程如下:

Step1:导入ES依赖

这里尽量指定ES以来的版本于自己ES服务一致,有时候版本不一致会报错。笔者这里的数据直接从京东爬取了,因此导入了jsoup依赖,做数据爬取。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.6.4</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.yy</groupId>
  12. <artifactId>springboot-demo08-elasticsearch</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>springboot-demo08-elasticsearch</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. <!-- 自定义es版本依赖,保证和本地版本一致-->
  19. <elasticsearch.version>7.6.1</elasticsearch.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-web</artifactId>
  29. </dependency>
  30. <!-- https://mvnrepository.com/artifact/org.jsoup/jsoup 用于爬取数据-->
  31. <dependency>
  32. <groupId>org.jsoup</groupId>
  33. <artifactId>jsoup</artifactId>
  34. <version>1.14.3</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>com.alibaba</groupId>
  38. <artifactId>fastjson</artifactId>
  39. <version>1.2.78</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.springframework.boot</groupId>
  43. <artifactId>spring-boot-devtools</artifactId>
  44. <scope>runtime</scope>
  45. <optional>true</optional>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.springframework.boot</groupId>
  49. <artifactId>spring-boot-configuration-processor</artifactId>
  50. <optional>true</optional>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.projectlombok</groupId>
  54. <artifactId>lombok</artifactId>
  55. <optional>true</optional>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.springframework.boot</groupId>
  59. <artifactId>spring-boot-starter-test</artifactId>
  60. <scope>test</scope>
  61. </dependency>
  62. </dependencies>
  63. <build>
  64. <plugins>
  65. <plugin>
  66. <groupId>org.springframework.boot</groupId>
  67. <artifactId>spring-boot-maven-plugin</artifactId>
  68. <configuration>
  69. <excludes>
  70. <exclude>
  71. <groupId>org.projectlombok</groupId>
  72. <artifactId>lombok</artifactId>
  73. </exclude>
  74. </excludes>
  75. </configuration>
  76. </plugin>
  77. </plugins>
  78. </build>
  79. </project>

Step2:配置ES客户端连接

笔者这里ES在本地,并且没有设置集群,因此只需要设置基本的客户端连接就行了,没有太多花里胡哨的东西。

  1. package com.yy.config;
  2. import org.apache.http.HttpHost;
  3. import org.elasticsearch.client.RestClient;
  4. import org.elasticsearch.client.RestHighLevelClient;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class ElasticSearchClientConfig {
  9.    @Bean
  10.    public RestHighLevelClient restHighLevelClient() {
  11.        return new RestHighLevelClient(
  12.                RestClient.builder(
  13.                        new HttpHost("127.0.0.1", 9200, "http")));
  14.        
  15.   }
  16. }

Step3:利用jsoup编写数据爬取类

在这之前还需要定义一个实体类,封装爬取数据。

  1. package com.yy.config.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @NoArgsConstructor
  7. @AllArgsConstructor
  8. public class Content {
  9.    private String id;
  10.    private String img;
  11.    private String price;
  12.    private String title;
  13. }

利用jsoup解析京东购物页面的URL,爬取部分数据作为实践数据。

  1. package com.yy.util;
  2. import com.yy.config.pojo.Content;
  3. import org.jsoup.Jsoup;
  4. import org.jsoup.nodes.Document;
  5. import org.jsoup.nodes.Element;
  6. import org.jsoup.select.Elements;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. import java.net.URL;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. public class HtmlParseUtil {
  13.    public static void main(String[] args) throws IOException {
  14.        new HtmlParseUtil().parseJD("java").forEach(System.out::println);
  15.   }
  16.    public List<Content> parseJD(String keywords) throws IOException {
  17.        String url = "https://search.jd.com/Search?keyword=" + keywords;
  18.        Document document = Jsoup.parse(new URL(url), 30000);
  19.        Element element = document.getElementById("J_goodsList");
  20.        Elements li = element.getElementsByTag("li");
  21.        ArrayList<Content> goodslist = new ArrayList<>();
  22.        for (Element el : li) {
  23.            String img = el.getElementsByTag("img").eq(0).attr("data-lazy-img");
  24.            String price = el.getElementsByClass("p-price").eq(0).text();
  25.            String title = el.getElementsByClass("p-name").eq(0).text();
  26.            System.out.println("=====================");
  27.            System.out.println(img);
  28.            System.out.println(price);
  29.            System.out.println(title);
  30.            Content content = new Content();
  31.            content.setImg(img);
  32.            content.setPrice(price);
  33.            content.setTitle(title);
  34.            goodslist.add(content);
  35.       }
  36.        return goodslist;
  37.   }
  38. }

Step4:实现基础业务功能

在业务层中实现基础增删改查操作,并将爬取数据放入ES索引中。

  1. package com.yy.config.Service;
  2. import com.alibaba.fastjson.JSON;
  3. import com.yy.config.pojo.Content;
  4. import com.yy.util.HtmlParseUtil;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.elasticsearch.action.bulk.BulkRequest;
  7. import org.elasticsearch.action.bulk.BulkResponse;
  8. import org.elasticsearch.action.delete.DeleteRequest;
  9. import org.elasticsearch.action.delete.DeleteResponse;
  10. import org.elasticsearch.action.index.IndexRequest;
  11. import org.elasticsearch.action.index.IndexResponse;
  12. import org.elasticsearch.action.search.SearchRequest;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.action.update.UpdateRequest;
  15. import org.elasticsearch.action.update.UpdateResponse;
  16. import org.elasticsearch.client.RequestOptions;
  17. import org.elasticsearch.client.RestHighLevelClient;
  18. import org.elasticsearch.common.text.Text;
  19. import org.elasticsearch.common.unit.TimeValue;
  20. import org.elasticsearch.common.xcontent.XContentType;
  21. import org.elasticsearch.index.query.QueryBuilders;
  22. import org.elasticsearch.index.query.TermQueryBuilder;
  23. import org.elasticsearch.search.SearchHit;
  24. import org.elasticsearch.search.builder.SearchSourceBuilder;
  25. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
  26. import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.stereotype.Service;
  29. import java.io.IOException;
  30. import java.util.ArrayList;
  31. import java.util.List;
  32. import java.util.Map;
  33. import java.util.concurrent.TimeUnit;
  34. @Service
  35. @Slf4j
  36. public class ContentService {
  37. @Autowired
  38. private RestHighLevelClient restHighLevelClient;
  39. //解析数据放入es的索引中
  40. public Boolean parseContent(String keywords) throws IOException {
  41. List<Content> contents = new HtmlParseUtil().parseJD(keywords);
  42. //把查询的数据放入es中
  43. BulkRequest bulkRequest = new BulkRequest();
  44. bulkRequest.timeout("2m");
  45. for (int i = 0; i < contents.size(); i++) {
  46. bulkRequest.add(
  47. new IndexRequest("my_goods")
  48. .source(JSON.toJSONString(contents.get(i)), XContentType.JSON));
  49. }
  50. BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  51. return !bulk.hasFailures();
  52. }
  53. /**
  54. * 分页查询ES索引库中的信息
  55. * @param keyword 搜索关键词
  56. * @param pageNo 第几页
  57. * @param pageSize size
  58. * @return 数据集合
  59. * @throws IOException
  60. */
  61. public List<Map<String, Object>> searchPage(String keyword, int pageNo, int pageSize) throws IOException {
  62. if (pageNo <= 1) {
  63. pageNo = 1;
  64. }
  65. //条件搜索
  66. SearchRequest searchRequest = new SearchRequest("my_goods");
  67. SearchSourceBuilder builder = new SearchSourceBuilder();
  68. //分页
  69. builder.from(pageNo-1);
  70. builder.size(pageSize);
  71. //精准匹配
  72. TermQueryBuilder titles = QueryBuilders.termQuery("title", keyword);
  73. builder.query(titles);
  74. builder.timeout(new TimeValue(60, TimeUnit.SECONDS));
  75. //高亮设置
  76. HighlightBuilder highlightBuilder = new HighlightBuilder();
  77. highlightBuilder.field("title");
  78. highlightBuilder.preTags("<span style='color:red'>");
  79. highlightBuilder.postTags("</span>");
  80. builder.highlighter(highlightBuilder);
  81. //执行搜索
  82. searchRequest.source(builder);
  83. SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  84. //解析结果
  85. ArrayList<Map<String, Object>> list = new ArrayList<>();
  86. for (SearchHit hit : search.getHits().getHits()) {
  87. Map<String, HighlightField> highlightFields = hit.getHighlightFields();
  88. HighlightField title = highlightFields.get("title");
  89. Map<String, Object> sourceAsMap = hit.getSourceAsMap();//原来的的结果
  90. //解析高亮字段
  91. if (title != null) {
  92. Text[] fragments = title.fragments();
  93. StringBuilder nText = new StringBuilder();
  94. for (Text text : fragments) {
  95. nText.append(text);
  96. }
  97. sourceAsMap.put("title", nText.toString());
  98. }
  99. list.add(sourceAsMap);
  100. }
  101. return list;
  102. }
  103. /**
  104. * 手动增加一条信息
  105. * @param content 新增对象
  106. * @throws IOException
  107. */
  108. public String addContent(Content content) throws IOException {
  109. //创建请求
  110. IndexRequest goods = new IndexRequest("my_goods");
  111. //规则 put/my_goods/_doc/id
  112. goods.id(content.getId());
  113. goods.timeout(TimeValue.timeValueSeconds(1));
  114. //将我们的数据放入请求 json
  115. goods.source(JSON.toJSONString(content), XContentType.JSON);
  116. //客户端发送请求
  117. IndexResponse ir = restHighLevelClient.index(goods, RequestOptions.DEFAULT);
  118. log.info("打印响应状态信息---》{}",ir.status() );
  119. return ir.status().getStatus()==200?"插入数据成功!":"插入数据失败~";
  120. }
  121. /**
  122. * 删除一条信息
  123. * @param id 对应id
  124. * @throws IOException
  125. */
  126. public String deleteOne(String id) throws IOException {
  127. DeleteRequest deleteRequest = new DeleteRequest("my_goods", id);
  128. deleteRequest.timeout("1s");
  129. DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
  130. log.info("打印响应结果状态信息---》{}", response.status());
  131. return response.status().getStatus()==200?"删除成功!":"删除失败~";
  132. }
  133. /**
  134. * 修改某数据
  135. * @param content 更新数据
  136. * @throws IOException
  137. */
  138. public String updateOne(Content content) throws IOException {
  139. UpdateRequest updateRequest = new UpdateRequest("my_goods", content.getId());
  140. updateRequest.timeout("1s");
  141. updateRequest.doc(JSON.toJSONString(content),XContentType.JSON);
  142. UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
  143. log.info("打印响应结果状态信息---》{}", response.status());
  144. return response.status().getStatus()==200?"更新成功!":"更新失败~";
  145. }
  146. }

实现数据解析业务后,后续通过接口即可爬取对应的信息到ES索引库中了。

Step5:构建Cotroller接口

  1. package com.yy.config.controller;
  2. import com.yy.config.Service.ContentService;
  3. import com.yy.config.pojo.Content;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.*;
  6. import java.io.IOException;
  7. import java.util.List;
  8. import java.util.Map;
  9. @RestController
  10. public class ContentController {
  11. @Autowired
  12. private ContentService contentService;
  13. @GetMapping("/parse/{keyword}")
  14. public Boolean parse(@PathVariable("keyword") String keyword) throws IOException {
  15. return contentService.parseContent(keyword);
  16. }
  17. @GetMapping("/search/{keyword}/{pageNo}/{pageSize}")
  18. public List<Map<String, Object>> search(
  19. @PathVariable("keyword") String keyword,
  20. @PathVariable("pageNo") int pageNo,
  21. @PathVariable("pageSize") int pageSize) throws IOException {
  22. return contentService.searchPage(keyword, pageNo, pageSize);
  23. }
  24. @PostMapping("/update")
  25. public String update(@RequestBody Content content) throws IOException {
  26. return contentService.updateOne(content);
  27. }
  28. @PostMapping("/add")
  29. public String add(@RequestBody Content content) throws IOException {
  30. return contentService.addContent(content);
  31. }
  32. @DeleteMapping("/delete/{id}")
  33. public String delete(@PathVariable String id) throws IOException {
  34. return contentService.deleteOne(id);
  35. }
  36. }

基础操作接口实现成功后,再启动项目之前,还需要启动本地的ES服务,有准备的可以同时开启ES-head-master和kibana等可视化平台,方便查看功能实现结果。

Step6:接口测试

首先测试parse/{keyword}接口,解析爬取的数据并将其放入ES索引库,这样ES就有数据信息了。笔者这里依此存放了Java、Vue、Linux、Go的信息数据。

 接口测试成功后通过es-head-master查看数据是否已存入ES。

数据导入基本完成。

测试分页查询,查询前3条带有Vue的数据:

 数据已查询到,并且对应关键字已经用高亮显示,由于测试原因,这里并不能直接观察高亮结果,但是如果前端解析就可见了。

测试新增一条数据结果:

 并且在可视化查询中也插入成功:

 

相应的更新、删除也能实现预期结果,这里就不重复展示接口测试结果了。这里要注意的是,笔者在代码层面使用的是termQuery精确查找,因此只会返会有的结果,如果match匹配的话可能会出现很多带有查询字段的结果。

四  实际使用中的ES数据同步

以上只是学习过程中的ES集成SpringBoot的基础操作使用。在日常项目使用中一般不会用它做数据存储,作为一个优秀的数据搜索和分析引擎,一般在日志收集和数据搜索上被广泛应用。这里只分享一个作为数据搜索时的中间件的简单使用方式。

这里以一个商城微服务项目为例,因为在商城系统中搜索服务肯定占主流的,用户根据商品关键字检索商品信息。如果只用数据库必然会承受大量访问压力。此时ES作为商品信息检索中间件使用的重要性就体现出来了。当前台界面加载的时候数据库执行一次查询操作并将查询的商品数据同步到ES索引库中,当用户搜索商品时,直接从ES库查询数据,避免用户频繁的直接对数据库查询操作。并且ES提供的高亮也能让查询信息更加清晰。

基本流程图如下:

 当两个服务都启动时,搜索服务在运行时首先检查ES服务中是否存在商品信息索引并建立新的索引表。然后调用商品服务接口中的商品搜索业务,向数据库查询所有商品数据,并将查询到的数据转换为JSON同步插入到ES中。此时当用户调用商品搜索时直接从ES库返回商品数据信息,而不是从数据库查询。当管理员对数据库执行相应地增删操作时,也会在业务逻辑中同时添加对ES的同步修改(也是容易出现数据一致性的环节)。步骤如下:

Step1:ES客户端访问配置

这里导入依赖就省略不再重复赘述了,依赖于ES服务尽量一致就行。

  1. package com.yy.Config;
  2. import org.apache.http.HttpHost;
  3. import org.elasticsearch.client.RestClient;
  4. import org.elasticsearch.client.RestHighLevelClient;
  5. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  6. import org.springframework.amqp.support.converter.MessageConverter;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. /**
  10. * @author young
  11. * Date 2023/2/12 16:46
  12. * Description: es搜索引擎客户端配置
  13. */
  14. @Configuration
  15. public class ElasticSearchConfiguration {
  16. /**
  17. * 配置es客户端
  18. * @return RestHighLevelClient客户端访问配置
  19. */
  20. @Bean
  21. public RestHighLevelClient restHighLevelClient(){
  22. return new RestHighLevelClient(
  23. RestClient.builder(HttpHost.create("http://localhost:9200")));
  24. }
  25. }

Step2:配置监听器实现数据同步

这里配置一个监听器继承ApplicationRunner接口即可,并实现接口里面的run(ApplicationArguments args)方法,这个接口可以让项目在启动时候初始化一些信息,也就是在初始化时,将建立的索引及查询的商品信息同步到ES库中。

  1. package com.yy.listener;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yy.client.ProductClient;
  4. import com.yy.constants.SearchIndex;
  5. import com.yy.doc.ProductDoc;
  6. import com.yy.pojo.Product;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.elasticsearch.action.bulk.BulkRequest;
  9. import org.elasticsearch.action.index.IndexRequest;
  10. import org.elasticsearch.client.RequestOptions;
  11. import org.elasticsearch.client.RestHighLevelClient;
  12. import org.elasticsearch.client.indices.CreateIndexRequest;
  13. import org.elasticsearch.client.indices.GetIndexRequest;
  14. import org.elasticsearch.common.xcontent.XContentType;
  15. import org.elasticsearch.index.query.QueryBuilder;
  16. import org.elasticsearch.index.query.QueryBuilders;
  17. import org.elasticsearch.index.reindex.DeleteByQueryRequest;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.boot.ApplicationArguments;
  20. import org.springframework.boot.ApplicationRunner;
  21. import org.springframework.data.redis.core.index.GeoIndexDefinition;
  22. import org.springframework.stereotype.Component;
  23. import org.springframework.test.context.TestPropertySource;
  24. import javax.annotation.Resource;
  25. import java.util.List;
  26. /**
  27. * @author young
  28. * Date 2023/2/12 16:46
  29. * Description: 数据同步
  30. */
  31. @Component
  32. @Slf4j
  33. public class SpringBootListener implements ApplicationRunner {
  34. @Autowired
  35. private ProductClient productClient;
  36. @Resource
  37. private RestHighLevelClient restHighLevelClient;
  38. /**
  39. * 需要在此方法,完成es数据的同步!
  40. * 1.判断下es中product索引是否存在
  41. * 2.不存在,java代码创建一个
  42. * 3.存在删除原来的数据
  43. * 4.查询商品全部数据
  44. * 5.进行es库的更新工作[插入]
  45. * @param args
  46. * @throws Exception
  47. */
  48. @Override
  49. public void run(ApplicationArguments args) throws Exception {
  50. //判断es中是否存在product的索引
  51. GetIndexRequest indexDefinition = new GetIndexRequest(SearchIndex.PRODUCT);
  52. boolean exists = restHighLevelClient.indices().exists(indexDefinition, RequestOptions.DEFAULT);
  53. //判断处理
  54. if (exists){
  55. //存在 删除所有数据
  56. DeleteByQueryRequest queryRequest = new DeleteByQueryRequest(SearchIndex.PRODUCT);
  57. //全部删除
  58. queryRequest.setQuery(QueryBuilders.matchAllQuery());
  59. restHighLevelClient.deleteByQuery(queryRequest,RequestOptions.DEFAULT);
  60. }else{
  61. //不存在的话创建product新的索引表
  62. CreateIndexRequest createIndexRequest = new CreateIndexRequest(SearchIndex.PRODUCT);
  63. String indexStr = "{\n" +
  64. " \"mappings\": {\n" +
  65. " \"properties\": {\n" +
  66. " \"productId\":{\n" +
  67. " \"type\": \"integer\",\n" +
  68. " \"index\":true\n" +
  69. " },\n" +
  70. " \"productName\":{\n" +
  71. " \"type\": \"text\",\n" +
  72. " \"analyzer\": \"ik_smart\",\n" +
  73. " \"copy_to\": \"all\"\n" +
  74. " },\n" +
  75. " \"categoryId\":{\n" +
  76. " \"type\": \"integer\"\n" +
  77. " },\n" +
  78. " \"productTitle\":{\n" +
  79. " \"type\": \"text\",\n" +
  80. " \"analyzer\": \"ik_smart\",\n" +
  81. " \"copy_to\": \"all\"\n" +
  82. " },\n" +
  83. " \"productIntro\":{\n" +
  84. " \"type\":\"text\",\n" +
  85. " \"analyzer\": \"ik_smart\",\n" +
  86. " \"copy_to\": \"all\"\n" +
  87. " },\n" +
  88. " \"productPicture\":{\n" +
  89. " \"type\": \"keyword\",\n" +
  90. " \"index\": false\n" +
  91. " },\n" +
  92. " \"productPrice\":{\n" +
  93. " \"type\": \"double\",\n" +
  94. " \"index\": true\n" +
  95. " },\n" +
  96. " \"productSellingPrice\":{\n" +
  97. " \"type\": \"double\"\n" +
  98. " },\n" +
  99. " \"productNum\":{\n" +
  100. " \"type\": \"integer\"\n" +
  101. " },\n" +
  102. " \"productSales\":{\n" +
  103. " \"type\": \"integer\"\n" +
  104. " },\n" +
  105. " \"all\":{\n" +
  106. " \"type\": \"text\",\n" +
  107. " \"analyzer\": \"ik_max_word\"\n" +
  108. " }\n" +
  109. " }\n" +
  110. " }\n" +
  111. "}\n";
  112. createIndexRequest.source(indexStr, XContentType.JSON);
  113. restHighLevelClient.indices().create(createIndexRequest,RequestOptions.DEFAULT);
  114. }
  115. //执行查询数据操作
  116. List<Product> productList = productClient.listProduct();
  117. //批量数据插入操作
  118. BulkRequest bulkRequest = new BulkRequest();
  119. ObjectMapper objectMapper = new ObjectMapper();
  120. for (Product product : productList) {
  121. ProductDoc productDoc = new ProductDoc(product);
  122. //用于插入数据的作用
  123. IndexRequest indexRequest = new IndexRequest(SearchIndex.PRODUCT);
  124. //将productDoc转换成JSON格式并插入es中
  125. String s = objectMapper.writeValueAsString(productDoc);
  126. indexRequest.source(s,XContentType.JSON);
  127. bulkRequest.add(indexRequest);
  128. }
  129. restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
  130. }
  131. }

Step3:搜索业务实现

主要是对关键字的分页查询,以及商品信息同步的插入、更新及删除操作,当管理员修改商品信息时调用该业务实现的接口同步修改ES。

  1. package com.yy.service.impl;
  2. import cn.hutool.core.text.CharSequenceUtil;
  3. import com.fasterxml.jackson.core.JsonProcessingException;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.yy.Result;
  6. import com.yy.constants.ResultEnum;
  7. import com.yy.constants.SearchIndex;
  8. import com.yy.doc.ProductDoc;
  9. import com.yy.exception.MyException;
  10. import com.yy.param.ProductSearchParam;
  11. import com.yy.pojo.Product;
  12. import com.yy.service.SearchService;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.elasticsearch.action.delete.DeleteRequest;
  15. import org.elasticsearch.action.index.IndexRequest;
  16. import org.elasticsearch.action.search.SearchRequest;
  17. import org.elasticsearch.action.search.SearchResponse;
  18. import org.elasticsearch.client.RequestOptions;
  19. import org.elasticsearch.client.RestHighLevelClient;
  20. import org.elasticsearch.common.xcontent.XContentType;
  21. import org.elasticsearch.index.query.QueryBuilders;
  22. import org.elasticsearch.search.SearchHit;
  23. import org.elasticsearch.search.SearchHits;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.util.StringUtils;
  26. import javax.annotation.Resource;
  27. import java.io.IOException;
  28. import java.util.ArrayList;
  29. import java.util.List;
  30. /**
  31. * @author young
  32. * Date 2023/2/12 17:12
  33. * Description: store-cloud
  34. */
  35. @Service
  36. @Slf4j
  37. public class SearchServiceImpl implements SearchService {
  38. @Resource
  39. private RestHighLevelClient restHighLevelClient;
  40. /**
  41. * 根据关键字和分页进行数据库数据查询
  42. * 1. 判断关键字是否为null null查询全部 不为null all 字段查询
  43. * 2. 添加分页属性
  44. * 3. es查询
  45. * 4. 结果处理
  46. * @param productSearchParam
  47. * @return
  48. */
  49. @Override
  50. public Result search(ProductSearchParam productSearchParam) {
  51. SearchRequest searchRequest = new SearchRequest(SearchIndex.PRODUCT);
  52. String search = productSearchParam.getSearch();
  53. if (CharSequenceUtil.isEmpty(search)){
  54. //null 不添加all关键字,查询全部即可
  55. //查询全部
  56. searchRequest.source().query(QueryBuilders.matchAllQuery());
  57. }else {
  58. //不为null则添加all匹配
  59. searchRequest.source().query(QueryBuilders.matchQuery("all",search));
  60. }
  61. //进行分页数据添加
  62. //偏移量 (当前页数-1)*页容量
  63. searchRequest.source().from((productSearchParam.getCurrentPage()-1)*productSearchParam.getPageSize());
  64. searchRequest.source().size(productSearchParam.getPageSize());
  65. SearchResponse searchResponse = null;
  66. try {
  67. searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  68. } catch (IOException e) {
  69. throw new MyException(ResultEnum.ERROR.getCode(),e.getMessage());
  70. }
  71. SearchHits hits = searchResponse.getHits();
  72. //查询符合的数据
  73. long total = hits.getTotalHits().value;
  74. //获得数据集合
  75. SearchHit[] hitsList = hits.getHits();
  76. List<Product> products = new ArrayList<>();
  77. //json处理器处理数据格式转换
  78. ObjectMapper objectMapper = new ObjectMapper();
  79. for (SearchHit documentFields : hitsList) {
  80. //查询的内容数据! productDoc模型对应的json数据
  81. String source = documentFields.getSourceAsString();
  82. Product product = null;
  83. try {
  84. //productDoc all - product 如果没有all的属性,会报错! jackson提供忽略没有属性的注解
  85. //TODO: 修改product的实体类,添加忽略没有属性的注解!
  86. product=objectMapper.readValue(source,Product.class);
  87. } catch (JsonProcessingException e) {
  88. e.printStackTrace();
  89. }
  90. products.add(product);
  91. }
  92. Result result = Result.ok(products,total);
  93. log.info("es的搜索业务执行完毕,结果数据为:{}",result);
  94. return result;
  95. }
  96. /**
  97. * 商品同步 : 插入和更新
  98. *
  99. * @param product
  100. * @return
  101. */
  102. @Override
  103. public Result save(Product product) throws IOException {
  104. IndexRequest indexRequest
  105. = new IndexRequest(SearchIndex.PRODUCT).id(product.getProductId().toString());
  106. ProductDoc productDoc = new ProductDoc(product);
  107. ObjectMapper objectMapper = new ObjectMapper();
  108. String json = objectMapper.writeValueAsString(productDoc);
  109. indexRequest.source(json, XContentType.JSON);
  110. restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  111. return Result.ok().message("数据同步成功!");
  112. }
  113. /**
  114. * 进行es库的商品删除
  115. *
  116. * @param productId
  117. * @return
  118. */
  119. @Override
  120. public Result remove(Integer productId) throws IOException {
  121. DeleteRequest request = new DeleteRequest(SearchIndex.PRODUCT).id(productId.toString());
  122. restHighLevelClient.delete(request,RequestOptions.DEFAULT);
  123. return Result.ok().message("es库的数据删除成功!");
  124. }
  125. }

Step4:实现搜索服务接口

构建Controller层,保证接口可用性。

  1. package com.yy.controller;
  2. import com.yy.Result;
  3. import com.yy.param.ProductSearchParam;
  4. import com.yy.pojo.Product;
  5. import com.yy.service.impl.SearchServiceImpl;
  6. import org.springframework.web.bind.annotation.*;
  7. import javax.annotation.Resource;
  8. import java.io.IOException;
  9. /**
  10. * @author young
  11. * Date 2023/2/12 17:12
  12. * Description: 搜索服务接口
  13. */
  14. @RequestMapping("/search")
  15. @RestController
  16. public class ElasticSearchController {
  17. @Resource
  18. private SearchServiceImpl searchService;
  19. @PostMapping("product")
  20. public Result searchProduct(@RequestBody ProductSearchParam productSearchParam){
  21. return searchService.search(productSearchParam);
  22. }
  23. /**
  24. * 同步調用,進行商品插入!覆蓋更新的!
  25. * @param product
  26. * @return
  27. */
  28. @PostMapping("save")
  29. public Result saveProduct(@RequestBody Product product) throws IOException {
  30. return searchService.save(product);
  31. }
  32. @DeleteMapping("remove")
  33. public Result removeProduct(@RequestParam Integer productId) throws IOException {
  34. return searchService.remove(productId);
  35. }
  36. }

Step5:商品服务业务远程调用搜索接口

这里为了方便只展示部分有关业务实现过程...

  1. @Service
  2. @Slf4j
  3. public class ProductServiceImpl extends ServiceImpl<ProductMapper, Product> implements ProductService {
  4. //引入客户端
  5. @Autowired
  6. private SearchClient searchClient;
  7. ……
  8. /**
  9. * 商品保存业务
  10. * 1. 商品数据保存
  11. * 2. 商品的图片详情切割和保存
  12. * 3. 搜索数据库的数据添加
  13. * 4. 清空商品相关的缓存数据
  14. * @param productSaveParam
  15. * @return
  16. */
  17. @CacheEvict(value = "list.product",allEntries = true)
  18. @Override
  19. @CostTime
  20. public Result adminSave(ProductSaveParam productSaveParam) {
  21. Product product = new Product();
  22. BeanUtils.copyProperties(productSaveParam,product);
  23. int rows = productMapper.insert(product); //商品数据插入
  24. log.info("ProductServiceImpl.adminSave业务结束,结果:{}",rows);
  25. //商品图片获取 +
  26. String pictures = productSaveParam.getPictures();
  27. if (!CharSequenceUtil.isEmpty(pictures)){
  28. //截取特殊字符串的时候 \\ [] 包含 $ + * | ?
  29. String[] urls = pictures.split("\\+");
  30. for (String url : urls) {
  31. Picture picture = new Picture();
  32. picture.setProductId(product.getProductId());
  33. picture.setProductPicture(url);
  34. pictureMapper.insert(picture); //插入商品的图片
  35. }
  36. }
  37. //同步搜索服务的数据
  38. searchClient.saveOrUpdate(product);
  39. return Result.ok().message("商品数据添加成功!");
  40. }
  41. /**
  42. * 商品数据更新
  43. * 1.更新商品数据
  44. * 2.同步搜索服务数据即可
  45. * @param product
  46. * @return
  47. */
  48. @Override
  49. public Result adminUpdate(Product product) {
  50. productMapper.updateById(product);
  51. //同步搜索服务的数据
  52. searchClient.saveOrUpdate(product);
  53. return Result.ok().message("商品数据更新成功!");
  54. }
  55. ……
  56. }

后续关于商品服务的接口就不一一陈列了,直接运行服务测试即可。

Step6:数据同步测试

启动两个服务后在ElasticSearch-head-mater中就可以看到数据基本已经全部同步到ES中了。

 测试分页查询商品结果:

 测试添加商品结果:

 此时插入的一条新数据就成功了,查看ES库和数据库是否同时插入数据:

 测试结果符合预期。相应的删除操作也是能同时删除数据库数据与ES库中的商品信息。

五  小总结

Elasticsearch作为一个开源搜索引擎,它在内部使用Luence做索引与搜索,通过对Lucene的封装,提供了一套简单一致的RESTful API。

Elasticsearch也是一种分布式的搜索引擎架构,可以很简单地扩展到上百个服务节点,并支持PB级别的数据查询,使系统具备高可用和高并发性。

因此它在分布式环境下作为搜索中间件的使用上确实有不错表现。从架构层面来讲,流传的一句话是:没有什么是加一层不能解决的。在搜索领域加一层ES作为中间件确实能减少数据层检索访问压力,并且极大提高大数据下的数据搜索效率。但是多一层中间件也就意味着系统复杂程度更高一层,带来的系统风险(比如数据一致性问题)也是相对的。因此学习之余,合理应用各类技术,如何趋利避害也是一个学习者应该好好考虑的~

部分基础概念参考ElasticSearch详解

更多关于ElasticSearch的学习可自行参考ElasticSearch官网

学习资料源码后续根据需求会放在Gitee或者Gitub中,ES服务安装包(含ES-7.6.1版本、ElasticSearch-head-master、kibana-7.6.1、ik分词器压缩包)基本已配置完毕的压缩包可在资源获取或者直接找笔者拿。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/936236
推荐阅读
相关标签
  

闽ICP备14008679号