当前位置:   article > 正文

Spring Boot整合Elasticsearch、Logstash实现MySQL数据同步及全文搜索_springboot集成logstash

springboot集成logstash

Elasticsearch

先来了解一下Elasticsearch

欢迎来到 Elastic — Elasticsearch 和 Kibana 的开发者 | Elastic

Elasticsearch 是一个开源的分布式搜索和分析引擎,它被设计用于处理和存储大规模的实时数据。它的主要特点是快速、强大的搜索能力和灵活的数据分析功能。以下是 Elasticsearch 的一些关键特性和用途:

  • 分布式架构:Elasticsearch 被设计为一个分布式系统,它可以在多台服务器上运行,形成一个集群。数据在集群中自动分片和复制,从而提供高可用性和可伸缩性。

  • 实时搜索和分析:Elasticsearch 提供了非常快速的实时搜索能力,可以在大规模数据集上进行快速的全文搜索,支持各种查询操作,如模糊搜索、精确匹配、范围查询等。

  • 多数据类型支持:Elasticsearch 不仅支持文本数据,还支持数字、日期、地理位置等多种数据类型的索引和查询。

  • 全文搜索引擎:Elasticsearch 提供强大的全文搜索功能,支持分词、语义分析、拼写纠错等,能够在大量文本数据中快速找到相关的结果。

  • 多种查询和过滤器:Elasticsearch 提供丰富的查询语法和过滤器,使用户能够更精确地检索数据。

  • 文档导向型数据库:Elasticsearch 是一种文档导向型数据库,数据以 JSON 格式的文档形式存储,每个文档都有一个唯一的标识符,称为文档 ID。

Elasticsearch 被广泛应用于各种领域,包括企业搜索、日志和事件数据分析、电子商务网站搜索、内容管理系统、业务指标监控和可视化等。它的强大搜索和分析能力使其成为处理大规模数据的重要工具之一。

Spring Boot后端配置

maven配置

Maven Repository: Search/Browse/Explore (mvnrepository.com)

pom.xml文件导相应的maven包

  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  5. </dependency>

application.properties配置

  1. #---------------------------------------
  2. # ELASTICSEARCH 搜索引擎配置
  3. #---------------------------------------
  4. # 连接数据库
  5. spring.elasticsearch.rest.uris=127.0.0.1:9200
  6. # 数据库用户名
  7. spring.elasticsearch.rest.username=elasticsearch

model层

定义相应的实体类FileSearchEntity,将其置于model层

  1. package com.ikkkp.example.model.po.es;
  2. import lombok.Data;
  3. import org.springframework.data.annotation.Id;
  4. import org.springframework.data.elasticsearch.annotations.Document;
  5. import org.springframework.data.elasticsearch.annotations.Field;
  6. import org.springframework.data.elasticsearch.annotations.FieldType;
  7. @Data
  8. @Document(indexName = "file_search")
  9. public class FileSearchEntity {
  10. @Id
  11. private Integer fileID;
  12. @Field(name = "analyzer_title", type = FieldType.Text, searchAnalyzer = "ik_max_word", analyzer = "ik_smart")
  13. private String title;
  14. @Field(name = "analyzer_abstract_content", type = FieldType.Text, searchAnalyzer = "ik_max_word", analyzer = "ik_smart")
  15. private String abstractContent;
  16. @Field(type = FieldType.Integer)
  17. private Integer size;
  18. @Field(name = "file_type", type = FieldType.Text)
  19. private String fileType;
  20. @Field(name = "upload_username", type = FieldType.Text)
  21. private String uploadUsername;
  22. @Field(name = "preview_picture_object_name", type = FieldType.Text)
  23. private String previewPictureObjectName;
  24. @Field(name = "payment_method", type = FieldType.Integer)
  25. private Integer paymentMethod;
  26. @Field(name = "payment_amount", type = FieldType.Integer)
  27. private Integer paymentAmount;
  28. @Field(name = "is_approved", type = FieldType.Boolean)
  29. private String isApproved;
  30. @Field(name = "hide_score", type = FieldType.Double)
  31. private Double hideScore;
  32. @Field(name = "analyzer_content",type = FieldType.Text, searchAnalyzer = "ik_max_word", analyzer = "ik_smart")
  33. private String content;
  34. @Field(name = "analyzer_keyword",type = FieldType.Keyword)
  35. private String keyword;
  36. @Field(name = "is_vip_income", type = FieldType.Text)
  37. private String isVipIncome;
  38. @Field(name = "score", type = FieldType.Text)
  39. private String score;
  40. @Field(name = "raters_num", type = FieldType.Text)
  41. private String ratersNum;
  42. @Field(name = "read_num", type = FieldType.Text)
  43. private String readNum;
  44. }

service层

定义相应的类ESearchService,将其置于service层

searchFile:通过创建了不同的查询条件和选项,搜索结果中包含了高亮信息,可以在前端界面中用于显示搜索结果时突出显示匹配的关键字。我们通过该方法进行文章全文的内容搜索。

suggestTitle:可以实现在搜索框中提供实时的自动补全建议,根据用户输入的关键字快速展示可能的补全项。

  1. package com.ikkkp.example.service.esImpl;
  2. import com.ikkkp.example.model.po.es.FileSearchEntity;
  3. import org.elasticsearch.common.unit.Fuzziness;
  4. import org.elasticsearch.index.query.BoolQueryBuilder;
  5. import org.elasticsearch.index.query.QueryBuilders;
  6. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
  7. import org.elasticsearch.search.suggest.Suggest;
  8. import org.elasticsearch.search.suggest.SuggestBuilder;
  9. import org.elasticsearch.search.suggest.SuggestBuilders;
  10. import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
  11. import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
  12. import org.springframework.data.domain.PageRequest;
  13. import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
  14. import org.springframework.data.elasticsearch.core.SearchHits;
  15. import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
  16. import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
  17. import org.springframework.stereotype.Service;
  18. import javax.annotation.Resource;
  19. import java.util.*;
  20. @Service
  21. public class ESearchService {
  22. @Resource
  23. private ElasticsearchRestTemplate elasticsearchRestTemplate;
  24. public SearchHits<FileSearchEntity> searchFile(String keywords, Integer page, Integer rows) {
  25. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
  26. .should(QueryBuilders.fuzzyQuery("analyzer_title", keywords).fuzziness(Fuzziness.AUTO))
  27. .should(QueryBuilders.fuzzyQuery("analyzer_content", keywords).fuzziness(Fuzziness.AUTO))
  28. .should(QueryBuilders.fuzzyQuery("analyzer_abstract_content", keywords).fuzziness(Fuzziness.AUTO))
  29. .must(QueryBuilders.multiMatchQuery(keywords,"analyzer_title","analyzer_content","analyzer_abstract_content"))
  30. .must(QueryBuilders.matchQuery("is_approved", "true"));//必须是已经被核准的才能被检索出来
  31. //构建高亮查询
  32. NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
  33. .withQuery(boolQueryBuilder)
  34. .withHighlightFields(
  35. new HighlightBuilder.Field("analyzer_title"),
  36. new HighlightBuilder.Field("analyzer_abstract_content"),
  37. new HighlightBuilder.Field("analyzer_content"))
  38. .withHighlightBuilder(new HighlightBuilder().preTags("<span class='highlight'>").postTags("</span>"))
  39. .withPageable(PageRequest.of(page - 1, rows)).build();
  40. SearchHits<FileSearchEntity> searchHits = elasticsearchRestTemplate.search(searchQuery, FileSearchEntity.class);
  41. return searchHits;
  42. }
  43. public ArrayList<String> suggestTitle(String keyword,Integer rows) {
  44. return suggest("suggest_title",keyword,rows);
  45. }
  46. public ArrayList<String> suggest(String fieldName, String keyword,Integer rows) {
  47. HashSet<String> returnSet = new LinkedHashSet<>(); // 用于存储查询到的结果
  48. // 创建CompletionSuggestionBuilder
  49. CompletionSuggestionBuilder textBuilder = SuggestBuilders.completionSuggestion(fieldName) // 指定字段名
  50. .size(rows) // 设定返回数量
  51. .skipDuplicates(true); // 去重
  52. // 创建suggestBuilder并将completionBuilder添加进去
  53. SuggestBuilder suggestBuilder = new SuggestBuilder();
  54. suggestBuilder.addSuggestion("suggest_text", textBuilder)
  55. .setGlobalText(keyword);
  56. // 执行请求
  57. Suggest suggest = elasticsearchRestTemplate.suggest(suggestBuilder, elasticsearchRestTemplate.getIndexCoordinatesFor(FileSearchEntity.class)).getSuggest();
  58. // 取出结果
  59. Suggest.Suggestion<Suggest.Suggestion.Entry<CompletionSuggestion.Entry.Option>> textSuggestion = suggest.getSuggestion("suggest_text");
  60. for (Suggest.Suggestion.Entry<CompletionSuggestion.Entry.Option> entry : textSuggestion.getEntries()) {
  61. List<CompletionSuggestion.Entry.Option> options = entry.getOptions();
  62. for (Suggest.Suggestion.Entry.Option option : options) {
  63. returnSet.add(option.getText().toString());
  64. }
  65. }
  66. return new ArrayList<>(returnSet);
  67. }
  68. }

controller层

定义相应的类DocSearchController,将其置于controller层

  1. package com.ikkkp.example.controller;
  2. import com.ikkkp.example.model.vo.MsgEntity;
  3. import com.ikkkp.example.model.po.es.FileSearchEntity;
  4. import com.ikkkp.example.service.esImpl.ESearchService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.data.elasticsearch.core.SearchHits;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RequestMethod;
  10. import org.springframework.web.bind.annotation.RequestParam;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.util.ArrayList;
  13. @RestController
  14. @Slf4j
  15. @RequestMapping("/docSearchService")
  16. public class DocSearchController {
  17. @Autowired
  18. ESearchService eSearchService;
  19. @RequestMapping(value = "/search",method = RequestMethod.GET)
  20. public MsgEntity<SearchHits<FileSearchEntity>> searchDoc(@RequestParam String keywords, @RequestParam Integer page, @RequestParam Integer rows) {
  21. return new MsgEntity<>("SUCCESS", "1", eSearchService.searchFile(keywords, page, rows));
  22. }
  23. @RequestMapping(value = "/suggest",method = RequestMethod.GET)
  24. public MsgEntity<ArrayList<String>> suggestTitle(@RequestParam String keyword, @RequestParam Integer rows) {
  25. ArrayList<String> suggests = eSearchService.suggestTitle(keyword, rows);
  26. return new MsgEntity<>("SUCCESS", "1", suggests);
  27. }
  28. }

现在我们已经基本完成了Spring Boot的配置,但要明确几点:

我们现在是在es数据库里面直接拿相应的数据,一般情况下还要涉及到数据导入进es上面的问题(这个数据其实可以是多个方面),我们现在就以数据在mysql上面为例,下面我们通过Logstash来定时拉取mysql的数据到es上。

先来了解一下Logstash:

Logstash

再来了解一下Logstash

Logstash是一个开源的数据收集引擎,具有实时流水线功能。

它从多个源头接收数据,进行数据处理,然后将转化后的信息发送到stash,即存储。

Logstash允许我们将任何格式的数据导入到任何数据存储中,不仅仅是ElasticSearch。

它可以用来将数据并行导入到其他NoSQL数据库,如MongoDB或Hadoop,甚至导入到AWS。

数据可以存储在文件中,也可以通过流等方式进行传递。

Logstash对数据进行解析、转换和过滤。它还可以从非结构化数据中推导出结构,对个人数据进行匿名处理,可以进行地理位置查询等等。

一个Logstash管道有两个必要的元素,输入和输出,以及一个可选的元素,过滤器。

输入组件从源头消耗数据,过滤组件转换数据,输出组件将数据写入一个或多个目的地。

从官网下载Download Logstash Free | Get Started Now | Elastic

开箱即用!!孩子馋的都快哭了 

主目录结构

aec229f8418645e2bc2832070fe60ed2.png

在主目录下面创建mysqletc文件夹

0f5df995fb9c4410b8fb524b696af433.png其中filesearch.sql无需多说,是自动化从MySQL收集数据的sql

  1. SELECT
  2. file.file_id,
  3. file.title as analyzer_title,
  4. file.title as suggest_title,
  5. file.abstract_content as analyzer_abstract_content,
  6. file.size,
  7. file.file_type,
  8. file.upload_username,
  9. file.preview_picture_object_name,
  10. file.payment_amount,
  11. file.payment_method,
  12. file.is_approved,
  13. file.hide_score,
  14. file_search.content as analyzer_content,
  15. file_search.keyword as suggest_keyword,
  16. file_search.keyword as analyzer_keyword,
  17. file_extra.is_vip_income,
  18. file_extra.score,
  19. file_extra.raters_num,
  20. file_extra.read_num
  21. FROM
  22. file,
  23. file_search,
  24. file_extra
  25. WHERE
  26. file.file_id = file_search.file_id AND
  27. file.file_id = file_extra.file_id

mysql.conf是Logstash执行的配置文件

  1. input {
  2. stdin {
  3. }
  4. jdbc {
  5. # mysql 数据库链接,shop为数据库名
  6. jdbc_connection_string => "jdbc:mysql://localhost:3306/yourdatabase"
  7. # 用户名和密码
  8. jdbc_user => "root"
  9. jdbc_password => "password"
  10. # 驱动
  11. jdbc_driver_library => "../mysqletc/mysql-connector-java-8.0.28.jar"
  12. # 驱动类名
  13. jdbc_driver_class => "com.mysql.jdbc.Driver"
  14. jdbc_paging_enabled => "true"
  15. jdbc_page_size => "500"
  16. # 执行的sql 文件路径+名称
  17. statement_filepath => "../mysqletc/filesearch.sql"
  18. # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
  19. schedule => "* * * * *"
  20. # 索引类型
  21. type => "_doc"
  22. }
  23. }
  24. filter {
  25. json {
  26. source => "message"
  27. remove_field => ["message"]
  28. }
  29. date {
  30. match => ["timestamp","dd/MM/yyyy:HH:mm:ss Z"]
  31. }
  32. }
  33. output {
  34. elasticsearch {
  35. hosts => ["localhost:9200"]
  36. index => "file_search"
  37. document_type => "_doc"
  38. document_id => "%{file_id}"
  39. template_overwrite => true
  40. template => "../mysqletc/logstash-ik.json"
  41. }
  42. stdout {
  43. codec => json_lines
  44. }
  45. }

附上logstash-ik.json。这段 JSON 配置是用于创建 Elasticsearch 索引模板(Index Template),在这个特定的示例中,它是用来定义 Elasticsearch 索引的映射和设置。十分重要!

  1. {
  2. "template": "*",
  3. "version": 50001,
  4. "settings": {
  5. "index.refresh_interval": "5s"
  6. },
  7. "mappings": {
  8. "dynamic_templates": [
  9. {
  10. "suggest_fields": {
  11. "match":"suggest_*",
  12. "match_mapping_type": "string",
  13. "mapping": {
  14. "type": "completion",
  15. "norms": false,
  16. "analyzer": "ik_max_word"
  17. }
  18. }
  19. },{
  20. "analyzer_fields": {
  21. "match":"analyzer_*",
  22. "match_mapping_type": "string",
  23. "mapping": {
  24. "type": "text",
  25. "norms": false,
  26. "analyzer": "ik_max_word",
  27. "fields": {
  28. "keyword": {
  29. "type": "keyword",
  30. "ignore_above": 256
  31. }
  32. }
  33. }
  34. }
  35. },{
  36. "string_fields": {
  37. "match": "*",
  38. "match_mapping_type": "string",
  39. "mapping": {
  40. "type": "text",
  41. "norms": false
  42. }
  43. }
  44. }
  45. ],
  46. "properties": {
  47. "@timestamp": {
  48. "type": "date"
  49. },
  50. "@version": {
  51. "type": "keyword"
  52. }
  53. }
  54. }
  55. }

bin目录执行批处理

最后再执行批处理 K:\Data\elasticsearch-logstash\bin>logstash -f ../mysqletc/mysql.conf

这时候由于设置的定时处理器

# 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

我们就可以看到命令行将MySQL数据库的数据定时导入es里面

注意!!!logstash存放目录是不能含有中文名的

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号