当前位置:   article > 正文

ElasticSearch与SpringBoot的集成使用_springboot集成elasticsearch的使用

springboot集成elasticsearch的使用

一、ElasticSearch了解

        ElasticSearch是一个基于lucene的分布式检索服务器。相对于solr搜索,在大数据量和数据并发量上更有优势,同时,也有数据库的数据存储功能,但提供了比数据库更多的存储功能,如分词搜索、关联度搜索等,同时搜索速度也不是一个级别,达到百万数据/秒的查询数据。总结优点为:实时搜索、稳定、可靠、快速、安装方便等。

        ElasticSearch中的概念(或叫做结构)和数据库中进行对比:

        1index:索引,相当于数据库中的一个库,里面可以建立很多表,存储不同类型的数据

        2type:类型,相当于数据库中的一张表,存储json类型的数据

        3document:文档,一个文档相当于数据库中的一条数据

        4field:列,相当于数据库中的列,也就是一个属性

        5shards:分片,通俗理解,就是数据分成几块区域来存储,可以理解为mysql中的分库分表(不太恰当)

        6replicas:备份,就是分片的备份数,相当于数据库中的备份库

二、ElasticSearch+nodejs+head集成安装(安装环境:jdk8window10

        1、先安装ElasticSearch

            (注意:ElasticSearch目前最新版本达到了7.0.1,但不同的版本在不同的安装环境下回出现不同的兼容性问题,详见百度,目前遇到的问题有:用7.0.1版本时无法设置network.host0.0.0.0,这样见无法让外部服务去访问当前服务器,故改成现在的6.7.2是可以的,并且,ElasticSearchJava项目,需要依赖jdk,并且对jdk版本有要求,目前安装的是jdk8。另外,安装es服务时安装路径最好不要包含空格,否则在做通过logstash数据同步的时候会报莫名的错误)

            1)安装:具体安装为在官网https://www.elastic.co/cn/downloads/elasticsearch#ga-release下载相应的版本后如:elasticsearch-6.7.2.zip,进行解压即可。

            2)启动:到elasticsearch-6.7.2\bin目录下,启动elasticsearch.bat即可

            3)判断是否安装启动成功:访问http://localhost:9200,出现如下图即为成功

               https://oscimg.oschina.net/oscnet/5e1b674e33b26fc3fed57cebf66eab3ade2.jpg

        2、安装nodejs

            1)在https://nodejs.org/en/download/ 下载对应的版本,进行傻瓜式安装(默认安装成功后自动配置环境变量)

            2)安装成功后使用 node -v查看nodejs的版本,使用npm -v查看npm的版本,如图:

                        https://oscimg.oschina.net/oscnet/da041197035cb1a6e13d26a44604cfdf750.jpg

            3)在nodejs的根目录下,执行npm install -g grunt-cli命令,安装grunt,安装完成后执行grunt -version查看是否安装成功,如图:

                        https://oscimg.oschina.net/oscnet/e2299f56c609bfbeb044496a61ce40f3ac5.jpg

            3、安装head

                1)网上下载elasticsearch-head.zip文件解压即可

                2)修改elasticsearch-head文件目录下的Gruntfile.js文件,添加如下内容:

                          https://oscimg.oschina.net/oscnet/063c80c79ba7145b90398a2dd95c9d3be7a.jpg

                3)修改elasticsearch-head\_site文件目录下的app.js文件内容,将红框中的内容修改为服务器地址,或是本机部署则不用修改。

                       https://oscimg.oschina.net/oscnet/4d20fdf6f76dfb599715f14660466c9ad4b.jpg

                  4)进入elasticsearch-head的跟目录下执行npm install 命令,如图:

                       https://oscimg.oschina.net/oscnet/dbf7fa1a40c651d3c37000c3938b649f668.jpg

            5)在elasticsearch-head的根目录下启动nodejs,执行grunt server 或者 npm run start,如图:

                        https://oscimg.oschina.net/oscnet/6e562213ca7cde8e9f75bc190e17152d094.jpg    

            6)访问http://localhost:9100,出现如图所示,则head+nodejs安装成功

                        https://oscimg.oschina.net/oscnet/be9278b9c5ca1ef544226a5069af04acb08.jpg

               7)、若想让head启动并链接elasticsearch成功,需先启动elasticsearch,若headelasticsearch不在同一服务器上时,需要在elasticsearch中做如下配置:

                            修改elasticsearch.yml文件,在文件末尾加入:

                            http.cors.enabled: true 
                            http.cors.allow-origin: "*"
                            node.master: true
                            node.data: true

                            放开network.host: 192.168.0.1的注释并改为network.host: 0.0.0.0(这样外部服务器也能访问es服务)

                            放开cluster.name(集群名称,服务启动前修改后,以后不要再随意修改);node.name(集群使用时回用到);http.port(默认端口号即可)的注释

                            双击elasticsearch.bat重启es

                           修改完elasticsearch.yml文件完启动es后再访问http://localhost:9100,若head链接es成功后如下图:

                            https://oscimg.oschina.net/oscnet/d67a1c07a1b320e17b6ef9d3493c4e8dc05.jpg

        4、在head中对es数据进行操作,如,添加删除索引,对索引中的数据进行增删改差等操作。(详见百度)

三、springboot+elasticsearch集成及简单使用

       1、先搭建好单独的springboot项目,以及部署好es服务(这里我是在之前搭建的SpringBoot多模块项目集成es)

       2相关依赖

  1. <!-- SpringBoot与elasticsearch整合的相关依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  5. </dependency>

      3、配置文件

  1. #增加elasticsearch配置
  2. spring.data.elasticsearch.repositories.enabled = true
  3. spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
  4. spring.data.elasticsearch.cluster-name=my-application-111

      4、相关代码

    Story
  1. package net.cnki.es.entity;
  2. import org.springframework.data.annotation.Id;
  3. import org.springframework.data.elasticsearch.annotations.Document;
  4. import org.springframework.data.elasticsearch.annotations.Field;
  5. import org.springframework.data.elasticsearch.annotations.FieldType;
  6. import java.util.Date;
  7. @Document(indexName = "story",type="doc",shards=5,replicas=1)
  8. public class Story {
  9. /**
  10. * @Document 作用在类,标记实体类为文档对象,一般有两个属性:
  11. * indexName:对应索引库名称
  12. * type:对应索引库中的类型(从其他文档中查看说是相当于关系型数据库中的一个表)
  13. * shards:分片数量,默认5个
  14. * replicas:副本数量,默认1个
  15. * @Id 作用在成员变量,标记一个字段作为id主键
  16. * @Field 作用在成员变量,标记为文档的字段,并指定字段映射属性:
  17. * type:字段类型,是枚举:FieldType,可以是text、long、short、date、integer、object等
  18. * text:存储数据时候,会自动分词,并生成索引
  19. * keyword:存储数据时候,不会分词建立索引
  20. * Numerical:数值类型,分两类
  21. * 基本数据类型:long、interger、short、byte、double、float、half_float
  22. * 浮点数的高精度类型:scaled_float
  23. * 需要指定一个精度因子,比如10或100。elasticsearch会把真实值乘以这个因子后存储,取出时再还原。
  24. * Date:日期类型
  25. * elasticsearch可以对日期格式化为字符串存储,但是建议我们存储为毫秒值,存储为long,节省空间。
  26. * index:是否索引,布尔类型,默认是true
  27. * store:是否存储,布尔类型,默认是false
  28. * analyzer:分词器名称,这里的ik_max_word即使用ik分词器
  29. * IK分词器有两种类型,分别是ik_smart分词器和ik_max_word分词器。
  30. * ik_smart: 会做最粗粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,国歌”。
  31. * ik_max_word: 会将文本做最细粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,中华人民,中华,华人,人民共和国,人民,人,民,共和国,共和,和,国国,国歌”,会穷尽各种可能的组合;
  32. */
  33. @Id
  34. private Integer id;
  35. @Field(type= FieldType.text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")
  36. private String volume;
  37. @Field(type=FieldType.text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")
  38. private String chapter;
  39. @Field(type=FieldType.text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")
  40. private String content;
  41. @Field(type=FieldType.Date)
  42. private Date createdate;
  43. /**
  44. * @return id
  45. */
  46. public Integer getId() {
  47. return id;
  48. }
  49. /**
  50. * @param id
  51. */
  52. public void setId(Integer id) {
  53. this.id = id;
  54. }
  55. public String getVolume() {
  56. return volume;
  57. }
  58. public void setVolume(String volume) {
  59. this.volume = volume;
  60. }
  61. public String getChapter() {
  62. return chapter;
  63. }
  64. public void setChapter(String chapter) {
  65. this.chapter = chapter;
  66. }
  67. public String getContent() {
  68. return content;
  69. }
  70. public void setContent(String content) {
  71. this.content = content;
  72. }
  73. public Date getCreatedate() {
  74. return createdate;
  75. }
  76. public void setCreatedate(Date createdate) {
  77. this.createdate = createdate;
  78. }
  79. @Override
  80. public String toString() {
  81. return "Story{" +
  82. "id=" + id +
  83. ", volume='" + volume + '\'' +
  84. ", chapter='" + chapter + '\'' +
  85. ", content='" + content + '\'' +
  86. ", createdate=" + createdate +
  87. '}';
  88. }
  89. }

        StoryService

  1. package net.cnki.es.service;
  2. import com.github.pagehelper.PageInfo;
  3. import net.cnki.es.entity.Story;
  4. import org.springframework.data.domain.Page;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.Map;
  8. public interface StoryService{
  9. /**
  10. * 多条件模糊分页查询
  11. * @param searchMap
  12. * 查询条件以searchMap中key、value形式传入,其中key为要查询的字段名称,value为查询字段值
  13. * @param ordersMap
  14. * 排序条件以ordersMap中key、value形式传入,其中key为排序的字段名称,value为排序规则:desc/asc
  15. * @param pageUtil
  16. * 分页对象信息
  17. * @param clazz
  18. * @return
  19. * 返回一个map数据,包含:
  20. * map.("data",dataListInfo<T>);
  21. * map.("pageEsInfo",pageEsInfo);
  22. */
  23. <T> Map<String,Object> queryByMapParamsForPage(Map<String,Object> searchMap, Map<String, Object> ordersMap, PageInfo<T> pageUtil, Class<T> clazz);
  24. /**
  25. * 多条件模糊高亮显示分页查询
  26. * @param searchMap
  27. * 查询条件以searchMap中key、value形式传入,其中key为要查询的字段名称,value为查询字段值
  28. * @param ordersMap
  29. * 排序条件以ordersMap中key、value形式传入,其中key为排序的字段名称,value为排序规则:desc/asc
  30. * @param pageUtil
  31. * 分页对象信息
  32. * @param clazz
  33. * @param color
  34. * 设置高亮显示的颜色
  35. * @return
  36. * 返回一个map数据,包含:
  37. * map.("data",dataListInfo<T>);
  38. * map.("pageEsInfo",pageEsInfo);
  39. */
  40. <T> Map<String,Object> highLigthQueryForPage(Map<String,Object> searchMap, Map<String, Object> ordersMap, PageInfo<T> pageUtil, Class<T> clazz, String color);
  41. /**
  42. * 根据id删除es服务器上的数据(由于MySQL数据库同步数据到es时,无法同步MySQL中删除的历史数据,故需在es服务器上自行删除)
  43. * @param id
  44. * @param clazz
  45. * @return
  46. */
  47. <T> String deleteEsDataById(String id, Class<T> clazz);
  48. /**
  49. * 新增数据到es服务器
  50. * @param id
  51. * 当前数据的id值
  52. * @param modelJson
  53. * 新增对象的实体类,以json字符串格式传入
  54. * @param index
  55. * 新增对象的索引值
  56. * @param type
  57. * 新增对象的类型值
  58. * @return
  59. */
  60. String addEsData(String id, String modelJson, String index, String type);
  61. /**
  62. * 更新数据到es服务器
  63. * @param id
  64. * 当前数据的id值
  65. * @param modelMap
  66. * 更新对象属性值,以map格式传入
  67. * @param index
  68. * 更新对象的索引值
  69. * @param type
  70. * 更新对象的类型值
  71. * @param clazz
  72. * 更新对象类
  73. * @return
  74. */
  75. <T> String updEsData(String id, Map<String,Object> modelMap, String index, String type, Class<T> clazz);
  76. /**
  77. * 判断es服务器中是否有该id
  78. * @param id
  79. * @return
  80. */
  81. String getIdById(String id);
  82. /**
  83. * 保存单条记录
  84. * @param story
  85. */
  86. void save(Story story);
  87. /**
  88. * 保存多条记录
  89. * @param list
  90. */
  91. void saveAll(List<Story> list);
  92. /**
  93. * 查询所有记录
  94. * @return
  95. */
  96. Iterator<Story> findAll();
  97. Page<Story> findByContent(String content);
  98. Page<Story> findByFirstCode(String firstCode);
  99. Page<Story> findBySecordCode(String secordCode);
  100. Page<Story> query(String key);
  101. void getStoryData();
  102. }
StoryServiceImpl
  1. package net.cnki.es.service.impl;
  2. import java.net.URL;
  3. import java.util.*;
  4. import java.util.Map.Entry;
  5. import com.github.pagehelper.PageInfo;
  6. import net.cnki.es.dao.EsDataQueryRepository;
  7. import net.cnki.es.entity.Story;
  8. import net.cnki.es.service.StoryService;
  9. import org.elasticsearch.action.search.SearchResponse;
  10. import org.elasticsearch.action.update.UpdateRequest;
  11. import org.elasticsearch.action.update.UpdateResponse;
  12. import org.elasticsearch.index.query.BoolQueryBuilder;
  13. import org.elasticsearch.index.query.QueryBuilder;
  14. import org.elasticsearch.index.query.QueryBuilders;
  15. import org.elasticsearch.search.SearchHit;
  16. import org.elasticsearch.search.SearchHits;
  17. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
  18. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder.Field;
  19. import org.jsoup.Jsoup;
  20. import org.jsoup.nodes.Document;
  21. import org.jsoup.nodes.Element;
  22. import org.jsoup.select.Elements;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.data.domain.Page;
  25. import org.springframework.data.domain.PageRequest;
  26. import org.springframework.data.domain.Pageable;
  27. import org.springframework.data.domain.Sort;
  28. import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
  29. import org.springframework.data.elasticsearch.core.SearchResultMapper;
  30. import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
  31. import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;
  32. import org.springframework.data.elasticsearch.core.query.IndexQuery;
  33. import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
  34. import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
  35. import org.springframework.data.elasticsearch.core.query.SearchQuery;
  36. import org.springframework.data.elasticsearch.core.query.UpdateQuery;
  37. import org.springframework.stereotype.Service;
  38. import com.alibaba.druid.util.StringUtils;
  39. @Service
  40. public class StoryServiceImpl implements StoryService {
  41. @Autowired
  42. private ElasticsearchTemplate elasticsearchTemplate;
  43. @Autowired
  44. private EsDataQueryRepository esRepository;
  45. private Pageable pageable = PageRequest.of(0,10);
  46. @SuppressWarnings({ "deprecation", "static-access" })
  47. @Override
  48. public <T> Map<String, Object> queryByMapParamsForPage(Map<String, Object> searchMap, Map<String, Object> ordersMap,
  49. PageInfo<T> pageEsInfo, Class<T> clazz) {
  50. Map<String, Object> map = new HashMap<String,Object>();
  51. //封装字段查询条件
  52. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  53. if(searchMap!=null && !searchMap.isEmpty()) {
  54. for(Entry<String, Object> vo : searchMap.entrySet()) {
  55. String key = vo.getKey();
  56. String value = (String) vo.getValue();
  57. if(!StringUtils.isEmpty(value)) {
  58. boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.matchQuery(key, value));
  59. }
  60. }
  61. }
  62. QueryBuilder queryBuilder = boolQueryBuilder;
  63. SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(queryBuilder).build();
  64. //获取封装分页查询条件,默认取第一页
  65. Pageable pageable= null;
  66. if(pageEsInfo!=null) {
  67. Integer currentPageNum = pageEsInfo.getPageNum();
  68. if(currentPageNum!=null&&currentPageNum>0) {
  69. currentPageNum = currentPageNum-1;
  70. }else {
  71. map.put("success", false);
  72. map.put("msg", "当前页数有误,请输入大于0的正整数");
  73. return map;
  74. }
  75. Integer pageSize = pageEsInfo.getPageSize();
  76. if(pageSize==null || pageSize==0) {
  77. map.put("success", false);
  78. map.put("msg", "每页条数有误,请输入大于0的正整数");
  79. return map;
  80. }
  81. //获取封装排序条件
  82. Sort sort = null;
  83. if(ordersMap!=null && !ordersMap.isEmpty()) {
  84. for(Entry<String, Object> vo : ordersMap.entrySet()) {
  85. String key = vo.getKey();
  86. String value = (String) vo.getValue();
  87. if("desc".equals(value)) {
  88. sort = sort.by(key).descending();
  89. }else {
  90. sort = sort.by(key).ascending();
  91. }
  92. }
  93. }
  94. if(sort!=null) {
  95. pageable = new PageRequest(currentPageNum, pageSize,sort);
  96. }else {
  97. pageable = new PageRequest(currentPageNum, pageSize);
  98. }
  99. searchQuery = new NativeSearchQueryBuilder().withQuery(queryBuilder).withPageable(pageable).build();
  100. }
  101. List<T> list = elasticsearchTemplate.queryForList(searchQuery, clazz);
  102. Long count = elasticsearchTemplate.count(searchQuery);
  103. pageEsInfo.setTotal(count.intValue());
  104. pageEsInfo.setSize(count.intValue());
  105. map.put("data", list);
  106. map.put("pageEsInfo", pageEsInfo);
  107. return map;
  108. }
  109. @SuppressWarnings({ "deprecation", "static-access" })
  110. @Override
  111. public <T> Map<String, Object> highLigthQueryForPage(Map<String, Object> searchMap, Map<String, Object> ordersMap,
  112. PageInfo<T> pageEsInfo, Class<T> clazz,String color) {
  113. Map<String,Object> mapRel = new HashMap<String,Object>();
  114. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  115. //封装字段查询条件,及高亮显示字段
  116. String preTag = "<font color='red'>";//google的色值
  117. String postTag = "</font>";
  118. //不传默认红色
  119. if(!StringUtils.isEmpty(color)) {
  120. preTag = preTag.replace("red", color);
  121. }
  122. int size = 0;
  123. for(Entry<String, Object> vo : searchMap.entrySet()) {
  124. if(!StringUtils.isEmpty((String)vo.getValue())) {
  125. size = size+1;
  126. }
  127. }
  128. HighlightBuilder.Field[] highlightFields = new Field[size];
  129. if(searchMap!=null && !searchMap.isEmpty()) {
  130. int i=0;
  131. for(Entry<String, Object> vo : searchMap.entrySet()) {
  132. if(!StringUtils.isEmpty((String)vo.getValue())) {
  133. String key = vo.getKey();
  134. String value = (String) vo.getValue();
  135. if(!StringUtils.isEmpty(value)) {
  136. boolQueryBuilder = boolQueryBuilder.must(QueryBuilders.matchQuery(key, value));
  137. Field field = new HighlightBuilder.Field(key).preTags(preTag).postTags(postTag);
  138. highlightFields[i] = field;
  139. i = i+1;
  140. }
  141. }
  142. }
  143. }
  144. NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
  145. .withQuery(boolQueryBuilder).withHighlightFields(highlightFields).build();
  146. //封装分页信息
  147. Pageable pageable= null;
  148. if(pageEsInfo!=null) {
  149. Integer currentPageNum = pageEsInfo.getPageNum();
  150. if(currentPageNum!=null&&currentPageNum>0) {
  151. currentPageNum = currentPageNum-1;
  152. }else {
  153. mapRel.put("success", false);
  154. mapRel.put("msg", "当前页数有误,请输入大于0的正整数");
  155. return mapRel;
  156. }
  157. Integer pageSize = pageEsInfo.getPageSize();
  158. if(pageSize==null || pageSize==0) {
  159. mapRel.put("success", false);
  160. mapRel.put("msg", "每页条数有误,请输入大于0的正整数");
  161. return mapRel;
  162. }
  163. //封装排序条件
  164. Sort sort = null;
  165. if(ordersMap!=null && !ordersMap.isEmpty()) {
  166. for(Entry<String, Object> vo : ordersMap.entrySet()) {
  167. String key = vo.getKey();
  168. String value = (String) vo.getValue();
  169. if("desc".equals(value)) {
  170. sort = sort.by(key).descending();
  171. }else {
  172. sort = sort.by(key).ascending();
  173. }
  174. }
  175. }
  176. if(sort!=null) {
  177. pageable = new PageRequest(currentPageNum, pageSize,sort);
  178. }else {
  179. pageable = new PageRequest(currentPageNum, pageSize);
  180. }
  181. searchQuery = new NativeSearchQueryBuilder()
  182. .withQuery(boolQueryBuilder).withPageable(pageable)
  183. .withHighlightFields(highlightFields).build();
  184. }
  185. Page<T> page = elasticsearchTemplate.queryForPage(searchQuery, clazz, new SearchResultMapper() {
  186. @SuppressWarnings({ "unchecked", "hiding" })
  187. @Override
  188. public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
  189. ArrayList<Map<String,Object>> poems = new ArrayList<Map<String,Object>>();
  190. SearchHits hits = response.getHits();
  191. for (SearchHit searchHit : hits) {
  192. if (hits.getHits().length <= 0) {
  193. return null;
  194. }
  195. Map<String,Object> mapResult = searchHit.getSourceAsMap();
  196. for(Entry<String, Object> vo : searchMap.entrySet()) {
  197. String key = vo.getKey();
  198. //将高亮内容设置进去
  199. String valueSearch = (String) vo.getValue();
  200. if(!StringUtils.isEmpty(valueSearch)) {
  201. String highLightMessage = searchHit.getHighlightFields().get(key).fragments()[0].toString();
  202. mapResult.put(key, highLightMessage);
  203. }
  204. }
  205. poems.add(mapResult);
  206. }
  207. if (poems.size() > 0) {
  208. return new AggregatedPageImpl<T>((List<T>) poems);
  209. }
  210. return null;
  211. }
  212. });
  213. List<T> poems = new ArrayList<T>();
  214. if(page!=null) {
  215. poems = (List<T>) page.getContent();
  216. }
  217. Long count = elasticsearchTemplate.count(searchQuery);
  218. int num = (int) count.longValue();
  219. pageEsInfo.setTotal(count.intValue());
  220. pageEsInfo.setSize(count.intValue());
  221. //pageEsInfo.setPages(count.intValue());
  222. if(num%pageEsInfo.getPageSize()==0){
  223. pageEsInfo.setPages(num%pageEsInfo.getPageSize());
  224. }else {
  225. pageEsInfo.setPages(num%pageEsInfo.getPageSize()+1);
  226. }
  227. pageEsInfo.setList(poems);
  228. mapRel.put("data", poems);
  229. mapRel.put("pageEsInfo", pageEsInfo);
  230. return mapRel;
  231. }
  232. @Override
  233. public <T> String deleteEsDataById(String id, Class<T> clazz) {
  234. String delId = elasticsearchTemplate.delete(clazz, id);
  235. return delId;
  236. }
  237. @Override
  238. public String addEsData(String id, String modelJson, String index, String type) {
  239. IndexQuery indexQuery = new IndexQuery();
  240. indexQuery.setId(id);
  241. indexQuery.setSource(modelJson);
  242. indexQuery.setIndexName(index);
  243. indexQuery.setType(type);
  244. String strId = elasticsearchTemplate.index(indexQuery);
  245. return strId;
  246. }
  247. @Override
  248. public <T> String updEsData(String id, Map<String, Object> modelMap, String index, String type, Class<T> clazz) {
  249. UpdateQuery updateQuery = new UpdateQuery();
  250. updateQuery.setId(id);
  251. updateQuery.setClazz(clazz);
  252. updateQuery.setIndexName(index);
  253. updateQuery.setType(type);
  254. UpdateRequest updateRequest = new UpdateRequest();
  255. updateRequest.doc(modelMap);
  256. updateQuery.setUpdateRequest(updateRequest);
  257. UpdateResponse up = elasticsearchTemplate.update(updateQuery);
  258. return up.getId();
  259. }
  260. @Override
  261. public String getIdById(String id) {
  262. // 构造搜索条件
  263. SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("id", id))).build();
  264. List<String> ids = elasticsearchTemplate.queryForIds(searchQuery);
  265. if(ids!=null&&ids.size()>0) {
  266. return ids.get(0);
  267. }
  268. return null;
  269. }
  270. @Override
  271. public void save(Story docBean) {
  272. esRepository.save(docBean);
  273. }
  274. @Override
  275. public void saveAll(List<Story> list) {
  276. esRepository.saveAll(list);
  277. }
  278. @Override
  279. public Iterator<Story> findAll() {
  280. return esRepository.findAll().iterator();
  281. }
  282. @Override
  283. public Page<Story> findByContent(String content) {
  284. return esRepository.findByContent(content,pageable);
  285. }
  286. @Override
  287. public Page<Story> findByFirstCode(String firstCode) {
  288. return esRepository.findByFirstCode(firstCode,pageable);
  289. }
  290. @Override
  291. public Page<Story> findBySecordCode(String secordCode) {
  292. return esRepository.findBySecordCode(secordCode,pageable);
  293. }
  294. @Override
  295. public Page<Story> query(String key) {
  296. return esRepository.findByContent(key,pageable);
  297. }
  298. /**
  299. * 爬取小说数据
  300. * 小说名:剑来
  301. */
  302. @Override
  303. public void getStoryData() {
  304. try {
  305. int htmlPages = 2324764;
  306. int id = 1;
  307. String urlstart = "http://www.shuquge.com/txt/8659/";
  308. String url = "http://www.shuquge.com/txt/8659/2324752.html";
  309. String volume = "第一卷 笼中雀";
  310. while (htmlPages<=38147404) {
  311. //拼接url
  312. //url = urlstart + htmlPages + ".html";
  313. Document document = Jsoup.parse(new URL(url), 30000);
  314. //获取小说内容元素
  315. Elements element = document.getElementsByClass("content");
  316. //获取结果集的第一个元素
  317. Element li = element.first();
  318. //章节
  319. String chapter = li.getElementsByTag("h1").eq(0).html();
  320. //内容
  321. String content = li.getElementById("content").html();
  322. Story story = new Story();
  323. //小说详情页没有第几卷,根据章节判断
  324. if (url.contains("2324837")) {
  325. volume = "第二卷 山水郎";
  326. }
  327. story.setId(id);
  328. story.setVolume(volume);
  329. story.setChapter(chapter);
  330. story.setContent(content);
  331. Date date = new Date();
  332. story.setCreatedate(date);
  333. System.out.println(story.toString());
  334. //插入es
  335. save(story);
  336. //获取下一章节地址
  337. Element nextChapterElement = document.getElementsByClass("page_chapter").first();
  338. String nextChapter = nextChapterElement.getElementsByTag("a").eq(2).attr("href");
  339. if(nextChapter.contains("index.html")){
  340. return;
  341. }
  342. url = urlstart + nextChapter;
  343. htmlPages++;
  344. id++;
  345. }
  346. }catch(Exception e){
  347. }
  348. }
  349. }

EsDataQueryRepository
  1. package net.cnki.es.dao;
  2. import net.cnki.es.entity.Story;
  3. import org.springframework.data.domain.Page;
  4. import org.springframework.data.domain.Pageable;
  5. import org.springframework.data.elasticsearch.annotations.Query;
  6. import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
  7. public interface EsDataQueryRepository extends ElasticsearchRepository<Story, Long> {
  8. //默认的注释
  9. //@Query("{\"bool\" : {\"must\" : {\"field\" : {\"content\" : \"?\"}}}}")
  10. Page<Story> findByContent(String content, Pageable pageable);
  11. @Query("{\"bool\" : {\"must\" : {\"field\" : {\"firstCode.keyword\" : \"?\"}}}}")
  12. Page<Story> findByFirstCode(String firstCode, Pageable pageable);
  13. @Query("{\"bool\" : {\"must\" : {\"field\" : {\"secordCode.keyword\" : \"?\"}}}}")
  14. Page<Story> findBySecordCode(String secordCode, Pageable pageable);
  15. }

StoryController
  1. package net.cnki.es.controller;
  2. import com.alibaba.fastjson.JSON;
  3. import com.github.pagehelper.PageHelper;
  4. import com.github.pagehelper.PageInfo;
  5. import net.cnki.es.entity.Story;
  6. import net.cnki.es.service.StoryService;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Controller;
  9. import org.springframework.util.StringUtils;
  10. import org.springframework.web.bind.annotation.GetMapping;
  11. import org.springframework.web.bind.annotation.RequestMapping;
  12. import org.springframework.web.bind.annotation.RequestParam;
  13. import org.springframework.web.bind.annotation.ResponseBody;
  14. import java.util.Date;
  15. import java.util.HashMap;
  16. import java.util.Iterator;
  17. import java.util.Map;
  18. @Controller
  19. @RequestMapping("es")
  20. public class StoryController {
  21. @Autowired
  22. private StoryService storyService;
  23. /**
  24. * 查询页面
  25. * @return
  26. */
  27. @GetMapping("/search")
  28. public String searchPage() {
  29. //ClassUtils.getDefaultClassLoader().getResource("templates").getPath()+
  30. return "pages/elasticsearch/search.html";
  31. }
  32. /**
  33. * 查询所有数据
  34. */
  35. @RequestMapping("/all")
  36. @ResponseBody
  37. public Iterator<Story> all(){
  38. return storyService.findAll();
  39. }
  40. /**
  41. * 抓取网页数据
  42. */
  43. @RequestMapping("/getStoryData")
  44. public void getStoryData(){
  45. storyService.getStoryData();
  46. }
  47. /**
  48. * 分页查询
  49. * @return
  50. */
  51. @RequestMapping("getStoryList")
  52. @ResponseBody
  53. public PageInfo getStoryList(@RequestParam(value = "limit",defaultValue = "10")Integer limit,
  54. @RequestParam(value = "page",defaultValue = "1")Integer page,String content,String chapter) {
  55. Map<String,Object> mapParams = new HashMap<String,Object>();
  56. mapParams.put("content", content);
  57. mapParams.put("chapter", chapter);
  58. PageInfo<Story> pageInfo = new PageInfo<Story>();
  59. PageHelper.startPage(page,limit);
  60. pageInfo.setPageNum(page);
  61. pageInfo.setPageSize(limit);
  62. Map<String,Object> mapOrders = new HashMap<String,Object>();
  63. mapOrders.put("id", "desc");
  64. Map<String,Object> map = new HashMap<String,Object>();
  65. map= storyService.highLigthQueryForPage(mapParams, mapOrders, pageInfo, Story.class,"");
  66. return (PageInfo)map.get("pageEsInfo");
  67. }
  68. /**
  69. * 分页查询
  70. */
  71. @RequestMapping("searchByPage")
  72. @ResponseBody
  73. public String getSyslogListTest2() {
  74. Map<String,Object> mapParams = new HashMap<String,Object>();
  75. mapParams.put("content", "平安");
  76. PageInfo<Story> pageInfo = new PageInfo<Story>();
  77. pageInfo.setPageNum(3);
  78. pageInfo.setPageSize(5);
  79. Map<String,Object> mapOrders = new HashMap<String,Object>();
  80. mapOrders.put("_id", "desc");
  81. Map<String,Object> map = new HashMap<String,Object>();
  82. map = storyService.queryByMapParamsForPage(mapParams, mapOrders, pageInfo, Story.class);
  83. return JSON.toJSONString(map);
  84. }
  85. /**
  86. * 高亮分页查询
  87. * @return
  88. */
  89. @RequestMapping("searchByHighPage")
  90. @ResponseBody
  91. public String getSyslogListTest1() {
  92. Map<String,Object> mapParams = new HashMap<String,Object>();
  93. mapParams.put("content", "东");
  94. PageInfo<Story> pageInfo = new PageInfo<Story>();
  95. pageInfo.setPageNum(1);
  96. pageInfo.setPageSize(5);
  97. Map<String,Object> mapOrders = new HashMap<String,Object>();
  98. mapOrders.put("id", "desc");
  99. Map<String,Object> map = new HashMap<String,Object>();
  100. map = storyService.highLigthQueryForPage(mapParams, mapOrders, pageInfo, Story.class,"#53FF53");
  101. return JSON.toJSONString(map);
  102. }
  103. /**
  104. * 删除记录
  105. * @return
  106. */
  107. @RequestMapping("delById")
  108. @ResponseBody
  109. public String delSyslogByIdTest() {
  110. String id = "1";
  111. if(!StringUtils.isEmpty(id)) {
  112. String delId = storyService.deleteEsDataById(id, Story.class);
  113. if(id.equals(delId)) {
  114. return "true";
  115. }else {
  116. return "false";
  117. }
  118. }else {
  119. return "false";
  120. }
  121. }
  122. /**
  123. * 添加记录
  124. * @return
  125. */
  126. @RequestMapping("addSyslog")
  127. @ResponseBody
  128. public String addSyslogInfoTest() {
  129. Story sysLog = new Story();
  130. sysLog.setId(100);
  131. sysLog.setVolume("skq");
  132. sysLog.setChapter("手动添加");
  133. sysLog.setContent("shoudong");
  134. sysLog.setCreatedate(new Date());
  135. String modelJson = JSON.toJSONString(sysLog);
  136. String id = sysLog.getId().toString();
  137. String strId = storyService.addEsData(id, modelJson, "syslog", "syslog");
  138. return strId;
  139. }
  140. /**
  141. * 更新记录
  142. * @return
  143. */
  144. @RequestMapping("updSyslog")
  145. @ResponseBody
  146. public String updSyslogInfoTest() {
  147. Map<String,Object> modelMap = new HashMap<String,Object>();
  148. modelMap.put("id", 100);
  149. modelMap.put("user_name", "skq12");
  150. modelMap.put("type", "sd");
  151. String strId = storyService.updEsData("100", modelMap, "syslog", "syslog", Story.class);
  152. return strId;
  153. }
  154. }

           5、测试es全文检索,在service中写了爬取剑来小说的方法,将爬取的每一章内容直接存到es中,效果如下图所示:

 

 

 

四、利用logstash实现MySQL中的数据全量/增量同步到elasticsearch服务器中(window10环境)

        1logstash的下载安装(logstash的安装时路径不要包含空格,某种在数据同步的时候会数据同步不成功)

             logstash的官网下载地址:https://www.elastic.co/downloads/logstash,注意下载的版本要与elasticsearch版本必须一直,如当前elasticsearch的版本是6.7.2,则logstash的版本也必须是6.7.2

            注意,在网上各种查找资料发现好多资料都需集成logstash-jdbc-input插件才能实现数据同步,后台才发现这个和版本有关系,在elasticsearch5.X及之后的版本是不需要集成该插件即可

            下载好相应的版本后解压即可(注意解压的路径,最好不要有中文和空格)。

        2logstash配置

           1)在logstash-6.7.2路径下创建空文件夹,如mysql,用来存放相关配置文件等

           2)在创建的新文件夹中(mysql文件夹)中放入驱动包:mysql-connector-java.jar

           3)在创建的新文件夹中(mysql文件夹)中创建一个sql文件,如find.sql,从这里开始,就是logstash同步数据库的核心操作了,在这里创建的sql文件主要内容是:mysql需要同步Elasticsearch的具体数据的查询方式,如果是全量同步,只需要select * from [table]即可

            4)在创建的新文件夹中(mysql文件夹)中创建一个conf文件,如jdbc.conf文件,该文件用于链接数据库和elasticsearch,其内容为:(注意,有的可能不识别注解,运行时需要将注解去掉)

  1. input {
  2. stdin {
  3. }
  4. jdbc {
  5. jdbc_connection_string => "jdbc:mysql://localhost:3306/quick_platform?characterEncoding=utf8&useSSL=false&serverTimezone=UTC"
  6. jdbc_user => "root"
  7. jdbc_password => "cnki_TTOD_7"
  8. jdbc_driver_library => "D:\ELK\logstash-6.7.2\mysql\mysql-connector-java-8.0.11.jar"
  9. jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  10. jdbc_paging_enabled => "true"
  11. jdbc_page_size => "50000"
  12. clean_run => false
  13. jdbc_default_timezone => "Asia/Shanghai"
  14. statement => "SELECT * FROM story where createdate > :sql_last_value order by createdate desc"
  15. schedule => "* * * * *"
  16. type => "sy"
  17. lowercase_column_names => false
  18. record_last_run => true
  19. use_column_value => true
  20. tracking_column => "createdate"
  21. tracking_column_type => "timestamp"
  22. last_run_metadata_path => "D:\ELK\logstash-6.7.2\mysql\lw_last_time"
  23. clean_run => false
  24. }
  25. }
  26. filter {
  27. json {
  28. source => "message"
  29. remove_field => ["message"]
  30. }
  31. }
  32. output {
  33. elasticsearch {
  34. hosts => "127.0.0.1:9200"
  35. index => "mysql_story"
  36. }
  37. stdout {
  38. codec => json_lines
  39. }
  40. }

网上的第二中配置,部分不同而已:

  1. input {
  2.     stdin {
  3.     }
  4.     jdbc {
  5.       jdbc_connection_string => "jdbc:mysql://localhost:3306/你的数据库名字"
  6.         jdbc_user => "你的数据库用户名"
  7.         jdbc_password => "你的数据库密码"
  8.         jdbc_driver_library => "C:/logstash/bin/mysql-connector-java-5.1.44-bin.jar"
  9.         jdbc_driver_class => "com.mysql.jdbc.Driver"
  10.         jdbc_paging_enabled => "true"
  11.         jdbc_page_size => "50000"
  12.         statement => "SELECT id(主键),其他内容 FROM 你的表"
  13.         schedule => "* * * * *"
  14.     }
  15. }
  16. output {
  17.      stdout {
  18.         codec => json_lines
  19.     }
  20.     elasticsearch {
  21.         hosts => "localhost:9200"
  22.         index => "你要创建的索引名"
  23.         document_type => "你要创建的索引类型"
  24.         document_id => "%{id}"
  25.     }
  26. }

另一个说明比较详细的版本:

  1. input {
  2.     stdin {
  3.     }
  4.     jdbc {
  5.       # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
  6.       jdbc_connection_string => "jdbc:mysql://数据库地址:端口号/数据库名?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
  7.       # 你的账户密码
  8.       jdbc_user => "账号"
  9.       jdbc_password => "密码"
  10.       # 连接数据库的驱动包,建议使用绝对地址
  11.       jdbc_driver_library => "mysql/mysql-connector-java-5.1.45-bin.jar"
  12.       # 这是不用动就好
  13.       jdbc_driver_class => "com.mysql.jdbc.Driver"
  14.       jdbc_paging_enabled => "true"
  15.       jdbc_page_size => "50000"
  16.  
  17.     #处理中文乱码问题
  18.       codec => plain { charset => "UTF-8"}
  19.  
  20.        #使用其它字段追踪,而不是用时间
  21.       use_column_value => true
  22.        #追踪的字段      
  23.     tracking_column => testid      
  24.     record_last_run => true     
  25.     #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值     
  26.     last_run_metadata_path => "mysql/station_parameter.txt"
  27.  
  28.       jdbc_default_timezone => "Asia/Shanghai"
  29.  
  30.       statement_filepath => "mysql/jdbc.sql"
  31.       
  32.     #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
  33.     clean_run => false
  34.  
  35.       # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟
  36.       schedule => "*/5 * * * *"
  37.       type => "jdbc"
  38.     }
  39. }
  40.  
  41. filter {
  42.     json {
  43.         source => "message"
  44.         remove_field => ["message"]
  45.     }
  46. }
  47.  
  48. output {
  49.     elasticsearch {
  50.         # 要导入到的Elasticsearch所在的主机
  51.         hosts => "192.168.105.180:9200"
  52.         # 要导入到的Elasticsearch的索引的名称
  53.         index => "db_anytest"
  54.         # 类型名称(类似数据库表名)
  55.         document_type => "table_anytest"
  56.         # 主键名称(类似数据库主键)
  57.         document_id => "%{testid}"
  58.         # es 账号
  59.         user => elastic
  60.         password => changeme
  61.         
  62.     }
  63.  
  64.     stdout {
  65.         # JSON格式输出
  66.         codec => json_lines
  67.     }
  68. }

        3、启动logstash开始同步数据库

              1)确保elasticsearch服务已启动,并且要同步的表里有相应的数据

              2cmd一个新窗口,进入到D:\logstash-6.7.2\bin

              3)运行命令logstash -f ../mysql/jdbc.conf  ,其中logstash -f表示运行指令, ../mysql/jdbc.conf表示我们配置的jdbc.conf文件路径,成功启动后,可以在终端中看见运行的sql和同步的数据,如图:https://oscimg.oschina.net/oscnet/22c18bfb83c90d6361283daeb10b2a038f9.jpg

               4)同步成功后即可在head中查看同步到elasticsearch中的数据,其中timestampversionelastisearch自己添加的字段。​​​​​​​

                5)注意点:

                        a、在同步的时候,如果是首次全量同步的话,可以不需要在elasticsearch中去新建索引和类型,同步的时候会根据配置自动创建

                        b、若是增量更新的话,在sql中添加查询条件即可,如

                    where  testid >= :sql_last_start

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/936259
推荐阅读
相关标签
  

闽ICP备14008679号