当前位置:   article > 正文

Elasticsearch零基础实战_elasticsearch 指定启动空间

elasticsearch 指定启动空间

分享后可优化点(待完成)

java es8 查询如何打印查询入参 ?(直接执行的json

es自定义分词器 如何实现?

kibana 监控jvm分子分母是什么 ?

es如何 改索引结构?

修改数据原理

分享前罗列大纲,挑出重点

分享时需要讲 原理+应用场景

根据笔记总结分享文档,代码截图等 (不要直接拿笔记出来讲)

任务目标

  1. 基础语法学习
  2. 从es6.8迁移至7.1
  3. 从es6.8迁移至8.7.0
  4. 相关java api升级(从spring boot 封住的es框架到原生es)
  5. 业务操作:订单历史数据从mysql迁移至es
  6. 断路器配置
  7. 监控器配置
  8. 快照配置

windows本地环境搭建(http)

下载es

es下载地址

es集群搭建

解压一个es8.7的zip,然后复制三份,像这样

然后对config下的 elasticsearch.yml 分别进行设置

  1. #节点 1 的配置信息:
  2. # ---------------------------------- Cluster -----------------------------------
  3. # 集群名称,节点之间要保持一致
  4. cluster.name: my-application
  5. # ------------------------------------ Node ------------------------------------
  6. # 节点名称,集群内要唯一
  7. node.name: node-1
  8. # 节点角色 [主节点、数据节点]:注意这一点同7.x版本的区别配置
  9. node.roles: [master,data]
  10. # ---------------------------------- Network -----------------------------------
  11. # 发布ip
  12. network.host: localhost
  13. # http 端口
  14. http.port: 9200
  15. # tcp 监听端口:注意这一点同7.x版本的区别配置
  16. transport.port: 9301
  17. # --------------------------------- Discovery ----------------------------------
  18. # 初始主节点
  19. cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
  20. # ---------------------------------- Various -----------------------------------
  21. # 删除索引是否需要指定索引全名(false就是可以用正则)
  22. #action.destructive_requires_name: false
  23. # 跨域配置
  24. http.cors.enabled: true
  25. http.cors.allow-origin: "*"
  1. # 节点 2 的配置信息:
  2. # ---------------------------------- Cluster -----------------------------------
  3. # 集群名称,节点之间要保持一致
  4. cluster.name: my-application
  5. # ------------------------------------ Node ------------------------------------
  6. # 节点名称,集群内要唯一
  7. node.name: node-2
  8. # 节点角色 [主节点、数据节点]
  9. node.roles: [master,data]
  10. # ---------------------------------- Network -----------------------------------
  11. # 发布ip
  12. network.host: localhost
  13. # http 端口
  14. http.port: 9201
  15. # tcp 监听端口
  16. transport.port: 9302
  17. # --------------------------------- Discovery ----------------------------------
  18. # 自动发现集群节点ip
  19. discovery.seed_hosts: ["localhost:9301"]
  20. # discovery.zen.fd.ping_timeout: 1m
  21. # discovery.zen.fd.ping_retries: 5
  22. # 集群内的可以被选为主节点的节点列表
  23. cluster.initial_master_nodes: ["node-1", "node-2","node-3"]
  24. # ---------------------------------- Various -----------------------------------
  25. # 跨域配置
  26. http.cors.enabled: true
  27. http.cors.allow-origin: "*"
  1. #节点 3 的配置信息:
  2. # ---------------------------------- Cluster -----------------------------------
  3. # 集群名称,节点之间要保持一致
  4. cluster.name: my-application
  5. # ------------------------------------ Node ------------------------------------
  6. # 节点名称,集群内要唯一
  7. node.name: node-3
  8. # 节点角色 [主节点、数据节点]
  9. node.roles: [master,data]
  10. # ---------------------------------- Network -----------------------------------
  11. # 发布ip
  12. network.host: localhost
  13. # http 端口
  14. http.port: 9203
  15. # tcp 监听端口
  16. transport.port: 9303
  17. # --------------------------------- Discovery ----------------------------------
  18. # 自动发现集群节点ip
  19. discovery.seed_hosts: ["localhost:9301", "localhost:9302"]
  20. # discovery.zen.fd.ping_timeout: 1m
  21. # discovery.zen.fd.ping_retries: 5
  22. # 集群内的可以被选为主节点的节点列表
  23. cluster.initial_master_nodes: ["node-1", "node-2","node-3"]
  24. # ---------------------------------- Various -----------------------------------
  25. # 跨域配置
  26. http.cors.enabled: true
  27. http.cors.allow-origin: "*"

然后一次性都启动

  1. cd E:\es\node\node-1\bin
  2. .\elasticsearch.bat
  3. cd E:\es\node\node-2\bin
  4. .\elasticsearch.bat
  5. cd E:\es\node\node-3\bin
  6. .\elasticsearch.bat

注意点

直接集群启动,es默认走http,

如果不配置集群,单节点,启动,es会在配置文件中添加https相关配置

问题

提示jvm内存不足

修改下内存

找到elasticsearch的安装目录,然后找到config文件夹,里面都是相关的配置文件。

官方不建议直接修改jvm.options,而是复制jvm.options到jvm.options.d目录下,再修改。

其中,jvm.options可以修改es运行时候的内存分配。打开jvm.options文件,我们可以发现默认设置的内存是4g。

我们改成2g

elasticsearch 内存大小设置

忘记密码

可能第一次启动的时候没有保存密码,在bin目录下执行命令:

  1. # 随机密码
  2. ./elasticsearch-reset-password -u elastic

注意:在某个节点执行一次即可,所有节点都会生效

参考文档

Windows环境下es8的集群部署

ElasticSearch集群搭建

es8.7官方文档

kibana启动

设置中文

在config的 kibana.yml 中新增

i18n.locale: "zh-CN"

启动

由于es默认开启了security,但是我们又没配置证书,所以kibana直接启动会出错,

需要在三个es节点的配置文件添加如下配置

xpack.security.enabled: false

然后启动

  1. cd E:\es\kibana-8.7.0-windows-x86_64\kibana-8.7.0\bin
  2. .\kibana.bat

大功告成!

windows本地环境搭建(https)

es集群搭建

在http步骤上引入https,之前步骤请查看http部分

证书生成

  1. 在bin目录下执行命令,生成证书
  2. ./elasticsearch-certutil ca
  3. ./elasticsearch-certutil cert --ca elastic-stack-ca.p12

然后将这两个证书文件拷贝到三个node节点的config的certs文件夹下

在配置文件中新增配置(三个节点都需要)

  1. # 开启https协议
  2. xpack.security.enabled: true
  3. # 这行似乎不需要,测试环境copy过来的,先加上
  4. xpack.license.self_generated.type: basic
  5. xpack.security.transport.ssl.enabled: true
  6. xpack.security.transport.ssl.verification_mode: certificate
  7. xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
  8. xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12

然后重启es

kibana配置

修改 kibana_system 的密码

  1. 在node1节点的bin目录下执行
  2. ./elasticsearch-reset-password -u kibana_system

修改 kibana_admin 的密码

  1. ./elasticsearch-reset-password -u kibana_admin
  2. 密码:

config目录下的 kibana.yml 添加如下配置

  1. server.port: 5601
  2. server.host: "localhost"
  3. server.name: "node-1"
  4. elasticsearch.hosts: ["http://localhost:9200","http://localhost:9201","http://localhost:9203"]
  5. elasticsearch.ssl.verificationMode: none
  6. elasticsearch.requestTimeout: 90000
  7. elasticsearch.username: "kibana_system"
  8. # elasticsearch.password: "Mvwm@n12nal"
  9. elasticsearch.password: "Ql2e3HvwXUL9QfBlb+06"

然后启动,在bin目录执行

./kibana.bat

在浏览器输入网址 http://localhost:5601/

使用 elastic账户登录

参考文档

ES、Kibana 8.0安装

elasticsearch8.2.0 初始化忘记密码重置

kibana设置中文

windows环境下es8的集群部署

linux环境搭建

基础语法

注意:es8的时候,已经默认不支持* 或者 _all了

关于删除命令的说明(官网)

查询

  1. # 查询所有节点
  2. GET /_cat/nodes?v
  3. # 查询所有索引
  4. GET /_cat/indices?v
  5. GET /_cat/indices?v&h=health,status,index
  6. # 获取所有的索引mapping信息
  7. GET _all/_mapping
  8. #获取当前索引信息
  9. GET /community_encyclopedia
  10. GET /community_encyclopedia/_settings
  11. GET /community_encyclopedia/_mapping
  12. # 查询当前索引总数
  13. GET /community_encyclopedia/_count
  14. # 查询当前索引数据
  15. GET /community_encyclopedia/_search
  16. # 查询当前索引单条数据
  17. GET /community_encyclopedia/_doc/100
  18. #只获取字段name,age
  19. GET /bamboo/_doc/1?_source=name,age
  20. #查询参数
  21. GET /community_encyclopedia/_search
  22. {
  23. "query":{
  24. "bool":{
  25. "must":[
  26. {
  27. "match_phrase": {
  28. "type":1
  29. }
  30. },{
  31. "match_phrase": {
  32. "status":1
  33. }
  34. }
  35. ]
  36. }
  37. },
  38. "sort":[
  39. {
  40. "normalIndex":"asc"
  41. },
  42. {
  43. "createTime.keyword":"desc"
  44. }
  45. ]
  46. }
  47. # 范围查询
  48. #查询参数
  49. GET /t_car_order/_search
  50. {
  51. "query":{
  52. "range": {
  53. "create_time": {
  54. "gte": "2017-07-24T21:26:21.000Z",
  55. "lte": "2017-07-25T00:26:21.000Z"
  56. }
  57. }
  58. }
  59. }
  60. # 聚集查询 id为1,2的数据
  61. GET /bamboo/_doc/_mget
  62. {
  63. "docs":[
  64. {
  65. "_id": 2
  66. },
  67. {
  68. "_id": 1
  69. }
  70. ]
  71. }
  72. GET /bamboo/_doc/_search
  73. {"query":{"bool":{"must":[{"match_all":{}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}
  74. # 模糊查询
  75. GET /community_encyclopedia/_search
  76. {
  77. "query":{
  78. "bool": {
  79. "must": [
  80. {"wildcard": {"managerQuestion.keyword": "*汽车保险包括*"}}
  81. ]
  82. }
  83. }
  84. }
  85. # 多条件查询
  86. GET /t_car_policy/_search
  87. {
  88. "query":{
  89. "bool":{
  90. "must":[
  91. {
  92. "match": {
  93. "orderNo":"DCARRxFyaf000060147"
  94. }
  95. },{
  96. "match": {
  97. "appPolicyNo":"0106003202023026408"
  98. }
  99. }
  100. ],
  101. "filter": [
  102. {
  103. "range": {
  104. "createTime": {
  105. "lt": "2023-10-23 00:00:00"
  106. }
  107. }
  108. }
  109. ]
  110. }
  111. }
  112. }
  113. 上面的语法中,filter不参与评分,也可以直接放在must中:
  114. GET /t_car_policy/_search
  115. {
  116. "query":{
  117. "bool":{
  118. "must":[
  119. {
  120. "match": {
  121. "orderNo":"DCARRxFyaf000060147"
  122. }
  123. },
  124. {
  125. "match": {
  126. "appPolicyNo":"0106003202023026408"
  127. }
  128. },
  129. {
  130. "range": {
  131. "createTime": {
  132. "lt": "2023-10-23 00:00:00"
  133. }
  134. }
  135. }
  136. ]
  137. }
  138. }
  139. }
  140. 以上是匹配查询,更建议使用 .keyword 精确查询
  141. #查询参数
  142. GET /t_car_policy/_search
  143. {
  144. "query":{
  145. "bool":{
  146. "must":[
  147. {
  148. "term": {
  149. "orderNo.keyword":"DCARRxFyaf000060147"
  150. }
  151. },
  152. {
  153. "term": {
  154. "appPolicyNo.keyword":"0106003202023026408"
  155. }
  156. },
  157. {
  158. "range": {
  159. "createTime": {
  160. "lt": "2023-10-23 00:00:00"
  161. }
  162. }
  163. }
  164. ]
  165. }
  166. }
  167. }

删除

  1. # 删除所有索引
  2. # DELETE /_all
  3. # 删除部分索引
  4. # DELETE /test-*
  5. # 删除单个索引
  6. # DELETE /bamboo
  7. # 删除某条数据
  8. # DELETE /bamboo/_doc/1

创建

  1. # 创建空索引
  2. PUT /bamboo
  3. # 创建索引和对应的mapping和setting
  4. PUT /bamboo
  5. {
  6. "mappings": {
  7. "properties": {
  8. "title": { "type": "text" },
  9. "name": { "type": "text" },
  10. "age": { "type": "integer" },
  11. "created": {
  12. "type": "date",
  13. "format": "strict_date_optional_time||epoch_millis"
  14. }
  15. }
  16. },
  17. "settings":{
  18. "index":{
  19. "number_of_shards": 5,
  20. "number_of_replicas": 1
  21. }
  22. }
  23. }
  24. #添加一条数据
  25. PUT /bamboo/_doc/1
  26. {
  27. "name":"zs",
  28. "title":"张三",
  29. "age":18,
  30. "created":"2018-12-25"
  31. }
  32. # 修改一条数据的某个属性值
  33. PUT /bamboo/_doc/1
  34. {
  35. "name":"lxs",
  36. "title":"李小四"
  37. }
  38. # 批量插入多个document,_id不指定则系统生成字符串
  39. POST /bamboo/_doc/_bulk
  40. {"index":{"_id":2}}
  41. {"name":"ww","title":"王五","age":18,"created":"2018-12-27"}
  42. {"index":{}}
  43. {"name":"zl","title":"赵六","age":25,"created":"2018-12-27"}
  44. # 批量操作(包含修改和删除)
  45. POST /bamboo/_doc/_bulk
  46. {"update":{"_id":"1"}}
  47. {"doc":{"title":"王小五"}}
  48. {"delete":{"_id":"2"}}

参考链接:语法

各关键字用法及详细示例--java api

es8语法

must, filter, should, must_not, constant_score的区别

and or

should:其查询子句应该被满足,也就是不一定都满足,逻辑相当于 or。

must:其查询子句必须全部被满足,逻辑相当于 and ,并且会计算分数。

filter:与 must 作用一样,但是不会计算分数。在 filter context 下的查询子句不会计算分数且会被缓存。

must_not:其查询子句必须都不被满足。当子句是在 filter context 下时,不会计算分数且会被缓存。

参考文档:

Elasticsearch 中 must, filter, should, must_not, constant_score 的区别

原理解析

查询es的某个索引,原理是怎样的?

引用一段话:

被混淆的概念是,一个 Lucene 索引我们在 Elasticsearch 称作分片 。 而在Elasticsearch中索引是分片的集合。 当 Elasticsearch 在索引中搜索的时候, 他发送查询到每一个属于索引的分片(Lucene 索引),然后合并每个分片的结果到一个全局的结果集。

总结来说,查询某个索引时,会根据查询算法定位到某些分片,进而缩小查询范围,提高查询效率

7.*中为什么现在要移除type?

  1. 和数据库的库-表类比是错误的,因为es中同一个index中不同type是存储在同一个索引中的(lucene的索引文件),因此不同type中相同名字的字段的定义(mapping)必须一致。
  2. 当您想要索引一个deleted字段在不同的type中数据类型不一样。一个类型中为日期字段,另外一个类型中为布尔字段时,这可能会导致ES的存储失败,因为这影响了ES的初衷设计。
  3. 另外,在一个index中建立很多实体,type,没有相同的字段,会导致数据稀疏,最终结果是干扰了Lucene有效压缩文档的能力,说白了就是影响ES的存储、检索效率。

es6.x和7.x对比

如何分片, 几个节点?

节点数<=主分片数*(副本数)

3个节点,3个主分片,1个副本数

可以简单设置成节点个数的倍数,以便在节点之间均匀分布数据

后续的高级操作可以考虑根据索引内容进行分片,比如根据时间进行分片等。

萌新发问

  1. logging:
  2. config: classpath:logback-spring.xml
  3. elasticsearch:
  4. # 多个IP逗号隔开
  5. hosts: 10.7.176.72:9200,10.7.176.73:9200,10.7.176.74:9200
  6. username: elastic
  7. password: Mvwm@n12nal
  1. private void createIndexDetail(String indexName) throws IOException {
  2. // 3.创建索引
  3. CreateIndexResponse response = elasticsearchClient
  4. .indices()
  5. .create(c -> c
  6. .index(indexName)
  7. .settings(sBuilder -> sBuilder
  8. .index(iBuilder -> iBuilder
  9. // 5️个分片
  10. .numberOfShards("5")
  11. // 一个副本
  12. .numberOfReplicas("1")))
  13. .mappings(mBuilder -> mBuilder
  14. .properties(OrderDetail.ORDER_NO, pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(30)))
  15. .properties(OrderDetail.MESSAGE, pBuilder -> pBuilder.text(textPropertyBuilder -> textPropertyBuilder.analyzer("ik_max_word").searchAnalyzer("ik_smart")))
  16. )
  17. );
  18. log.info("createIndexDetail方法,acknowledged={}", response.acknowledged());
  19. }

这是我在springboot中对es8集群的配置,我想知道当我创建一个索引时,是三个hosts节点都会有我的索引吗?

5个分片,每个分片一个副本分片,所以也有5个副本分片,同一个主分片和副本分片不会在一台host节点上,然后集群会将这些分片打散分布在集群中。

加入我往这个索引中插入100条数据,数据大概是如何分布的呢

假如我往这个索引中插入100条数据,数据大概是如何分布的呢?

大概意思就是说,100条数据在5个主分片上,相当于存放在mysql表中,只是这个表的数据不在一个磁盘上

然后100备份数据放在副本分片上。 相当于主从mysql库

java代码升级

es6.8升级到es7.1.1

采用方式:spring Data Elasticsearch 方式

因为当前springboot版本支持7.1.1,所以采用此方式

版本对应

由于集团使用的是es7.1.1版本,所以我们对应使用spring Data Elasticsearch 4.0.x

相关api

QueryBuilders.matchQuery():全文查询的参数,会进行分词操作

QueryBuilders.termQuery():精确查询的参数

参考文档

springboot3.4迁移到4.0官方指南

es6.8升级到es8.7.0

采用方式:es8.7原生spi

由于升级spring boot成本过大,为了和其他微服务的spring boot版本保持一致,故采用此方式

参考文档:

Elasticsearch Java API Client官方文档

Springboot整合ES8(Java API Client)

springboo整合elasticSearch8 java client api

ElasticSearch中文博客

ES8(Java API Client)查询详解

接口示例:

  1. package com.djbx.dh.search.controller.dhwiki;
  2. import co.elastic.clients.elasticsearch.ElasticsearchClient;
  3. import co.elastic.clients.elasticsearch._types.FieldSort;
  4. import co.elastic.clients.elasticsearch._types.SortOptions;
  5. import co.elastic.clients.elasticsearch._types.SortOrder;
  6. import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
  7. import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
  8. import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
  9. import co.elastic.clients.elasticsearch._types.query_dsl.WildcardQuery;
  10. import co.elastic.clients.elasticsearch.core.GetResponse;
  11. import co.elastic.clients.elasticsearch.core.SearchResponse;
  12. import co.elastic.clients.elasticsearch.core.search.Hit;
  13. import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
  14. import com.alibaba.nacos.client.naming.utils.CollectionUtils;
  15. import com.djbx.dh.common.exception.BusinessException;
  16. import com.djbx.dh.common.model.po.ResultCode;
  17. import com.djbx.dh.common.model.vo.ResultEntity;
  18. import com.djbx.dh.search.common.constants.CommonConstants;
  19. import com.djbx.dh.search.common.enums.ValidOrInvalidEnum;
  20. import com.djbx.dh.search.common.enums.YesOrNoEnum;
  21. import com.djbx.dh.search.common.util.UUIDUtil;
  22. import com.djbx.dh.search.entity.dto.IdDTO;
  23. import com.djbx.dh.search.entity.dto.dhwiki.CommunityEncyclopediaQuestionAnswerDTO;
  24. import com.djbx.dh.search.entity.dto.dhwiki.ProductLibraryDTO;
  25. import com.djbx.dh.search.entity.dto.dhwiki.QuestionAnswerUpdateDTO;
  26. import com.djbx.dh.search.entity.po.dhwiki.CommunityEncyclopediaQuestionAnswer;
  27. import com.djbx.dh.search.entity.po.dhwiki.CommunityEncyclopediaQuestionCategory;
  28. import com.djbx.dh.search.entity.vo.dhwiki.CommunityEncyclopediaQuestionAnswerVO;
  29. import com.djbx.dh.search.service.dhwiki.CommunityEncyclopediaQuestionCategoryService;
  30. import io.swagger.annotations.Api;
  31. import io.swagger.annotations.ApiOperation;
  32. import lombok.extern.slf4j.Slf4j;
  33. import org.apache.commons.lang3.StringUtils;
  34. import org.springframework.web.bind.annotation.PostMapping;
  35. import org.springframework.web.bind.annotation.RequestBody;
  36. import org.springframework.web.bind.annotation.RequestMapping;
  37. import org.springframework.web.bind.annotation.RestController;
  38. import javax.annotation.Resource;
  39. import java.io.IOException;
  40. import java.util.ArrayList;
  41. import java.util.Date;
  42. import java.util.HashMap;
  43. import java.util.List;
  44. import java.util.Map;
  45. import java.util.Objects;
  46. import java.util.stream.Collectors;
  47. /**
  48. * @author DJ033979
  49. */
  50. @Slf4j
  51. @RestController
  52. @RequestMapping("/encyclopedia")
  53. @Api(tags = "搭伙百科")
  54. public class CommunityEncyclopediaController {
  55. @Resource
  56. private CommunityEncyclopediaQuestionCategoryService communityEncyclopediaQuestionCategoryService;
  57. @Resource
  58. private ElasticsearchClient elasticsearchClient;
  59. /**
  60. * 保存数据
  61. *
  62. * @param questionAnswer 问答数据
  63. * @return ResultEntity
  64. */
  65. @PostMapping("/saveOrUpdate")
  66. @ApiOperation(value = "问答数据保存OR更新")
  67. public ResultEntity<?> saveOrUpdate(@RequestBody CommunityEncyclopediaQuestionAnswer questionAnswer) throws BusinessException, IOException {
  68. //前置校验,判断索引是否存在
  69. if (!elasticsearchClient.indices().exists(e -> e.index(CommonConstants.WIKI_INDEX)).value()) {
  70. // 创建索引
  71. createIndexDetail();
  72. }
  73. // 置顶或推荐判断排序值是否可用
  74. if (YesOrNoEnum.isYes(questionAnswer.getIsCategoryTop()) && findValidOneByTypeAndStatusAndCategoryAndTopIndex(
  75. ValidOrInvalidEnum.VALID.getValue(), YesOrNoEnum.YES.getValue(), questionAnswer.getCategory(), questionAnswer.getTopIndex())) {
  76. log.error("有效的置顶顺序被占用,不能使用");
  77. return ResultEntity.error("有效的置顶顺序被占用,不能使用");
  78. }
  79. if (YesOrNoEnum.isYes(questionAnswer.getIsHotRecommendTop()) && findValidOneByTypeAndStatusAndCategoryAndTopIndex(
  80. ValidOrInvalidEnum.VALID.getValue(), YesOrNoEnum.YES.getValue(), questionAnswer.getCategory(), questionAnswer.getTopIndex())) {
  81. log.error("有效的推荐顺序被占用,不能使用");
  82. return ResultEntity.error("有效的推荐顺序被占用,不能使用");
  83. }
  84. // 此字段用于管理后台like查询
  85. questionAnswer.setManagerQuestion(questionAnswer.getQuestion());
  86. // 保存
  87. if (StringUtils.isBlank(questionAnswer.getId())) {
  88. //设置创建时间
  89. questionAnswer.setId(UUIDUtil.uuid());
  90. questionAnswer.setType(YesOrNoEnum.YES.getValue());
  91. questionAnswer.setCreateTime(new Date());
  92. } else {// 更新
  93. // 根据id获取es的数据
  94. CommunityEncyclopediaQuestionAnswer encyclopediaQuestionAnswer = getCommunityEncyclopediaQuestionAnswerById(questionAnswer.getId());
  95. // 设置更新数据
  96. questionAnswer.setCreateTime(encyclopediaQuestionAnswer.getCreateTime());
  97. questionAnswer.setType(encyclopediaQuestionAnswer.getType());
  98. questionAnswer.setUpdateTime(new Date());
  99. }
  100. try {
  101. elasticsearchClient.index(s ->
  102. s.index(CommonConstants.WIKI_INDEX)
  103. .id(questionAnswer.getId())
  104. .document(questionAnswer)
  105. );
  106. } catch (Exception e) {
  107. log.error("问答数据保存OR更新至ES报错:{}", e.getMessage());
  108. return ResultEntity.error("问答数据保存OR更新至ES报错");
  109. }
  110. return ResultEntity.success("问答数据保存OR更新至ES成功");
  111. }
  112. /**
  113. * 根据问答ID查询问答数据记录
  114. *
  115. * @param dto 问答ID
  116. * @return CommunityEncyclopediaQuestionAnswer
  117. */
  118. @PostMapping("/getById")
  119. @ApiOperation(value = "根据ID查询")
  120. public ResultEntity<?> getById(@RequestBody IdDTO dto) throws BusinessException {
  121. if (null == dto || StringUtils.isBlank(dto.getId())) {
  122. log.error("百科问答ID不能为NULL");
  123. return ResultEntity.error("百科问答ID不能为空和NULL");
  124. }
  125. return ResultEntity.success(getCommunityEncyclopediaQuestionAnswerById(dto.getId()));
  126. }
  127. /**
  128. * 百科问答上下线操作
  129. */
  130. @PostMapping("/updateStatus")
  131. @ApiOperation(value = "百科问答上下线操作")
  132. public ResultEntity<?> updateStatus(@RequestBody QuestionAnswerUpdateDTO dto) throws BusinessException, IOException {
  133. if (null == dto || StringUtils.isBlank(dto.getId()) || null == dto.getStatus()) {
  134. log.error("百科问答ID或发布状态不能为空和NULL");
  135. return ResultEntity.error("百科问答ID或发布状态不能为空和NULL");
  136. }
  137. String id = dto.getId();
  138. Integer status = dto.getStatus();
  139. CommunityEncyclopediaQuestionAnswer encyclopediaQuestionAnswer = getCommunityEncyclopediaQuestionAnswerById(id);
  140. if (status.equals(encyclopediaQuestionAnswer.getStatus())) {
  141. log.error("百科问答当前状态已为上下线的操作状态");
  142. return ResultEntity.error("百科问答当前状态已为上下线的操作状态");
  143. }
  144. // 上线时,判断置顶排序和推荐顺序是否占用
  145. if (YesOrNoEnum.isYes(status) && null != encyclopediaQuestionAnswer.getTopIndex() && findValidOneByTypeAndStatusAndCategoryAndTopIndex(
  146. ValidOrInvalidEnum.VALID.getValue(), status, encyclopediaQuestionAnswer.getCategory(), encyclopediaQuestionAnswer.getTopIndex())) {
  147. log.error("有效的置顶顺序被占用,不能使用");
  148. return ResultEntity.error("有效的置顶顺序被占用,不能使用");
  149. }
  150. if (YesOrNoEnum.isYes(status) && null != encyclopediaQuestionAnswer.getRecommendIndex() && findValidOneByTypeAndStatusAndRecommendIndex(
  151. ValidOrInvalidEnum.VALID.getValue(), status, encyclopediaQuestionAnswer.getRecommendIndex())) {
  152. log.error("有效的推荐顺序被占用,不能使用");
  153. return ResultEntity.error("有效的推荐顺序被占用,不能使用");
  154. }
  155. // 更新上下线状态
  156. encyclopediaQuestionAnswer.setStatus(status);
  157. encyclopediaQuestionAnswer.setUpdateTime(new Date());
  158. try {
  159. elasticsearchClient.update(e -> e
  160. .index(CommonConstants.WIKI_INDEX)
  161. .id(id)
  162. .doc(encyclopediaQuestionAnswer), CommunityEncyclopediaQuestionAnswer.class
  163. );
  164. } catch (Exception e) {
  165. log.error("百科问答上下线操作失败:{}", e.getMessage());
  166. return ResultEntity.error("百科问答上下线操作失败");
  167. }
  168. return ResultEntity.success("百科问答上下线操作成功");
  169. }
  170. /**
  171. * 问题分类上下线而更新该分类下所有问答的上下线状态
  172. */
  173. @PostMapping("/updateStatusByCategory")
  174. @ApiOperation(value = "问题分类上下线同步百科问答上下线状态")
  175. public ResultEntity<?> updateStatusByCategory(@RequestBody Map<String, Object> map) throws BusinessException, IOException {
  176. Integer category = (Integer) map.get("category");
  177. Integer status = (Integer) map.get("status");
  178. List<CommunityEncyclopediaQuestionAnswer> encyclopediaQuestionAnswers = findValidListByTypeAndCategory(
  179. ValidOrInvalidEnum.VALID.getValue(), category);
  180. try {
  181. if (!CollectionUtils.isEmpty(encyclopediaQuestionAnswers)) {
  182. for (CommunityEncyclopediaQuestionAnswer encyclopediaQuestionAnswer : encyclopediaQuestionAnswers) {
  183. // 上线时,判断推荐顺序是否占用,如占用设置为不是热门推荐
  184. if (YesOrNoEnum.isYes(status) && null != encyclopediaQuestionAnswer.getRecommendIndex() && findValidOneByTypeAndStatusAndRecommendIndex(
  185. ValidOrInvalidEnum.VALID.getValue(), status, encyclopediaQuestionAnswer.getRecommendIndex())) {
  186. encyclopediaQuestionAnswer.setIsHotRecommend(YesOrNoEnum.NO.getValue());
  187. encyclopediaQuestionAnswer.setIsHotRecommendTop(YesOrNoEnum.NO.getValue());
  188. encyclopediaQuestionAnswer.setRecommendIndex(null);
  189. }
  190. // 更新上下线状态
  191. encyclopediaQuestionAnswer.setStatus(status);
  192. encyclopediaQuestionAnswer.setUpdateTime(new Date());
  193. elasticsearchClient.update(e -> e
  194. .index(CommonConstants.WIKI_INDEX)
  195. .id(encyclopediaQuestionAnswer.getId())
  196. .doc(encyclopediaQuestionAnswer), CommunityEncyclopediaQuestionAnswer.class
  197. );
  198. }
  199. }
  200. } catch (Exception e) {
  201. log.error("该问题分类下的百科问答上下线操作失败:{}", e.getMessage());
  202. return ResultEntity.error("该问题分类下的百科问答上下线操作失败");
  203. }
  204. return ResultEntity.success("该问题分类下的百科问答上下线操作成功");
  205. }
  206. /**
  207. * 根据问答ID删除问答数据记录
  208. */
  209. @PostMapping("/delete")
  210. @ApiOperation(value = "根据ID删除百科问答")
  211. public ResultEntity<?> delete(@RequestBody IdDTO dto) throws BusinessException {
  212. if (null == dto || StringUtils.isBlank(dto.getId())) {
  213. log.error("百科问答ID不能为空和NULL");
  214. return ResultEntity.error("百科问答ID不能为空和NULL");
  215. }
  216. CommunityEncyclopediaQuestionAnswer encyclopediaQuestionAnswer = getCommunityEncyclopediaQuestionAnswerById(dto.getId());
  217. // 更新有效无效,逻辑删除
  218. encyclopediaQuestionAnswer.setType(ValidOrInvalidEnum.INVALID.getValue());
  219. // 去除占用的排序值,设置为默认255
  220. encyclopediaQuestionAnswer.setTopIndex(null);
  221. encyclopediaQuestionAnswer.setRecommendIndex(null);
  222. encyclopediaQuestionAnswer.setUpdateTime(new Date());
  223. try {
  224. elasticsearchClient.update(e -> e
  225. .index(CommonConstants.WIKI_INDEX)
  226. .id(dto.getId())
  227. .doc(encyclopediaQuestionAnswer), CommunityEncyclopediaQuestionAnswer.class
  228. );
  229. } catch (Exception e) {
  230. log.error("删除百科问答操作失败:{}", e.getMessage());
  231. return ResultEntity.error("删除百科问答操作失败");
  232. }
  233. return ResultEntity.success("删除百科问答操作成功");
  234. }
  235. /**
  236. * 取消推荐
  237. */
  238. @PostMapping("/cancelRecommend")
  239. @ApiOperation(value = "取消推荐")
  240. public ResultEntity<?> cancelRecommend(@RequestBody IdDTO dto) throws BusinessException {
  241. if (null == dto || StringUtils.isBlank(dto.getId())) {
  242. log.error("百科问答ID不能为空和NULL");
  243. return ResultEntity.error("百科问答ID不能为空和NULL");
  244. }
  245. CommunityEncyclopediaQuestionAnswer encyclopediaQuestionAnswer = getCommunityEncyclopediaQuestionAnswerById(dto.getId());
  246. // 取消推荐
  247. encyclopediaQuestionAnswer.setIsHotRecommend(YesOrNoEnum.NO.getValue());
  248. encyclopediaQuestionAnswer.setIsHotRecommendTop(YesOrNoEnum.NO.getValue());
  249. encyclopediaQuestionAnswer.setRecommendIndex(null);
  250. encyclopediaQuestionAnswer.setUpdateTime(new Date());
  251. try {
  252. elasticsearchClient.update(e -> e
  253. .index(CommonConstants.WIKI_INDEX)
  254. .id(dto.getId())
  255. .doc(encyclopediaQuestionAnswer), CommunityEncyclopediaQuestionAnswer.class
  256. );
  257. } catch (Exception e) {
  258. log.error("取消推荐操作失败:{}", e.getMessage());
  259. return ResultEntity.error("取消推荐操作失败");
  260. }
  261. return ResultEntity.success("取消推荐操作成功");
  262. }
  263. // todo user服务接口指向有问题,应该是 user服务的接口写错了,应和search服务保持一致,待修改
  264. /**
  265. * 管理后台查询
  266. * 多条件查询,分页,过滤,排序
  267. */
  268. @PostMapping("/listEncyclopediaQuestionAnswer")
  269. @ApiOperation(value = "分页查询百科问答列表")
  270. public ResultEntity<?> listEncyclopediaQuestionAnswer(@RequestBody CommunityEncyclopediaQuestionAnswerDTO questionAnswerDTO) throws IOException {
  271. BoolQuery.Builder boolQuery = QueryBuilders.bool()
  272. .must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(1))._toQuery());
  273. if (!StringUtils.isBlank(questionAnswerDTO.getQuestion())) {
  274. boolQuery.must(WildcardQuery.of(w -> w.field(CommunityEncyclopediaQuestionAnswer.FIELD_MANAGER_QUESTION).value("*" + questionAnswerDTO.getQuestion() + "*"))._toQuery());
  275. }
  276. if (null != questionAnswerDTO.getCategory()) {
  277. boolQuery.must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_CATEGORY).value(questionAnswerDTO.getCategory()))._toQuery());
  278. }
  279. if (null != questionAnswerDTO.getIsCategoryTop()) {
  280. boolQuery.must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_IS_CATEGORY_TOP).value(questionAnswerDTO.getIsCategoryTop()))._toQuery());
  281. }
  282. if (null != questionAnswerDTO.getIsHotRecommend()) {
  283. boolQuery.must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_IS_HOT_RECOMMEND).value(questionAnswerDTO.getIsHotRecommend()))._toQuery());
  284. }
  285. if (null != questionAnswerDTO.getStatus()) {
  286. boolQuery.must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(questionAnswerDTO.getStatus()))._toQuery());
  287. }
  288. SearchResponse<CommunityEncyclopediaQuestionAnswer> search = elasticsearchClient.search(s -> s
  289. .index(CommonConstants.WIKI_INDEX)
  290. .query(q -> q
  291. .bool(boolQuery.build())
  292. )
  293. .from(questionAnswerDTO.getPageNo() - 1)
  294. .size(questionAnswerDTO.getPageSize())
  295. .sort(sortOptionsBuilder -> sortOptionsBuilder
  296. .field(fieldSortBuilder -> fieldSortBuilder
  297. .field(CommunityEncyclopediaQuestionAnswer.FIELD_TOP_INDEX).order(SortOrder.Asc)
  298. .field(CommunityEncyclopediaQuestionAnswer.FIELD_NORMAL_INDEX).order(SortOrder.Asc)
  299. .field(CommunityEncyclopediaQuestionAnswer.FIELD_CREATE_TIME).order(SortOrder.Desc)))
  300. , CommunityEncyclopediaQuestionAnswer.class);
  301. List<CommunityEncyclopediaQuestionAnswer> questionAnswerList = search.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  302. List<CommunityEncyclopediaQuestionCategory> questionCategoryList = communityEncyclopediaQuestionCategoryService.listCommunityEncyclopediaQuestionCategory();
  303. if (null != questionCategoryList && !questionCategoryList.isEmpty()) {
  304. List<Integer> questionCategoryIdList = questionCategoryList.stream().map(CommunityEncyclopediaQuestionCategory::getId).collect(Collectors.toList());
  305. questionAnswerList = questionAnswerList.stream()
  306. .filter(questionAnswer -> questionCategoryIdList.contains(questionAnswer.getCategory()))
  307. .collect(Collectors.toList());
  308. } else {
  309. questionAnswerList = new ArrayList<>();
  310. }
  311. Map<String, Object> map = new HashMap<>(16);
  312. map.put("total", Objects.requireNonNull(search.hits().total()).value());
  313. map.put("dataList", questionAnswerList);
  314. return ResultEntity.success(map);
  315. }
  316. /**
  317. * APP查询
  318. * 根据问题关键词和问题类别查询问答列表
  319. */
  320. @PostMapping(value = "/listCommunityEncyclopediaQuestionAnswer")
  321. @ApiOperation(value = "APP百科问答搜索")
  322. public ResultEntity<?> listCommunityEncyclopediaQuestionAnswer(@RequestBody CommunityEncyclopediaQuestionAnswerDTO questionAnswerDTO) throws IOException {
  323. // 设置多条件
  324. BoolQuery.Builder boolQuery = QueryBuilders.bool()
  325. .must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(YesOrNoEnum.YES.getValue()))._toQuery())
  326. .must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(YesOrNoEnum.YES.getValue()))._toQuery());
  327. if (!StringUtils.isBlank(questionAnswerDTO.getQuestion())) {
  328. boolQuery.must(WildcardQuery.of(w -> w.field(CommunityEncyclopediaQuestionAnswer.FIELD_QUESTION).value(questionAnswerDTO.getQuestion()))._toQuery());
  329. }
  330. SortOptions.Builder sortBuilder = new SortOptions.Builder();
  331. if (null != questionAnswerDTO.getCategory()) {
  332. // 热门问答
  333. if (0 == questionAnswerDTO.getCategory()) {
  334. boolQuery.must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_IS_HOT_RECOMMEND).value(YesOrNoEnum.YES.getValue()))._toQuery());
  335. sortBuilder.field(FieldSort.of(f -> f.field(CommunityEncyclopediaQuestionAnswer.FIELD_RECOMMEND_INDEX).order(SortOrder.Asc)));
  336. // 分类别查询
  337. } else {
  338. boolQuery.must(TermQuery.of(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_CATEGORY).value(questionAnswerDTO.getCategory()))._toQuery());
  339. sortBuilder.field(FieldSort.of(f -> f.field(CommunityEncyclopediaQuestionAnswer.FIELD_TOP_INDEX).order(SortOrder.Asc)));
  340. }
  341. }
  342. // 设置排序字段
  343. sortBuilder.field(FieldSort.of(f -> f.field(CommunityEncyclopediaQuestionAnswer.FIELD_NORMAL_INDEX).order(SortOrder.Asc)));
  344. sortBuilder.field(FieldSort.of(f -> f.field(CommunityEncyclopediaQuestionAnswer.FIELD_CREATE_TIME).order(SortOrder.Desc)));
  345. SearchResponse<CommunityEncyclopediaQuestionAnswer> search = elasticsearchClient.search(s -> s
  346. .index(CommonConstants.WIKI_INDEX)
  347. .query(q -> q
  348. .bool(boolQuery.build())
  349. )
  350. .from(questionAnswerDTO.getPageNo() - 1)
  351. .size(questionAnswerDTO.getPageSize())
  352. .sort(sortBuilder.build())
  353. , CommunityEncyclopediaQuestionAnswer.class);
  354. List<CommunityEncyclopediaQuestionCategory> categoryList = communityEncyclopediaQuestionCategoryService.getAllList();
  355. List<CommunityEncyclopediaQuestionAnswer> questionAnswerList = search.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  356. List<CommunityEncyclopediaQuestionAnswerVO> questionAnswerVOList = questionAnswerList.stream()
  357. .map(questionAnswer -> {
  358. CommunityEncyclopediaQuestionAnswerVO questionAnswerVO = new CommunityEncyclopediaQuestionAnswerVO();
  359. questionAnswerVO.setId(questionAnswer.getId());
  360. questionAnswerVO.setQuestion(questionAnswer.getQuestion());
  361. questionAnswerVO.setAnswer(questionAnswer.getAnswer());
  362. if (categoryList != null && !categoryList.isEmpty()) {
  363. for (CommunityEncyclopediaQuestionCategory communityEncyclopediaQuestionCategory : categoryList) {
  364. if (communityEncyclopediaQuestionCategory.getId().equals(questionAnswer.getCategory())) {
  365. questionAnswerVO.setCategoryTitle(communityEncyclopediaQuestionCategory.getTitle());
  366. break;
  367. }
  368. }
  369. }
  370. return questionAnswerVO;
  371. }).collect(Collectors.toList());
  372. Map<String, Object> map = new HashMap<>(16);
  373. map.put("total", Objects.requireNonNull(search.hits().total()).value());
  374. map.put("dataList", questionAnswerVOList);
  375. return ResultEntity.success(map);
  376. }
  377. private CommunityEncyclopediaQuestionAnswer getCommunityEncyclopediaQuestionAnswerById(String id) throws BusinessException {
  378. if (null == id) {
  379. log.error("问答ID不能为NULL");
  380. throw new BusinessException(ResultCode.ERROR.getCode(), "问答ID不能为NULL");
  381. }
  382. GetResponse<CommunityEncyclopediaQuestionAnswer> optionalById;
  383. try {
  384. // 根据问答ID查询到问答数据记录
  385. optionalById = elasticsearchClient.get(s -> s.index(CommonConstants.WIKI_INDEX).id(id), CommunityEncyclopediaQuestionAnswer.class);
  386. } catch (Exception e) {
  387. log.error("es服务异常,请联系管理员:{}", e.getMessage());
  388. throw new BusinessException(ResultCode.ERROR.getCode(), "es服务异常,请联系管理员");
  389. }
  390. if (!optionalById.found()) {
  391. log.error("根据ID未查询到对应问答数据记录");
  392. throw new BusinessException(ResultCode.ERROR.getCode(), "根据问答ID未查询到问答数据记录");
  393. }
  394. CommunityEncyclopediaQuestionAnswer communityEncyclopediaQuestionAnswer = optionalById.source();
  395. assert communityEncyclopediaQuestionAnswer != null;
  396. if (!ValidOrInvalidEnum.isValid(communityEncyclopediaQuestionAnswer.getType())) {
  397. log.error("根据ID查询的问答数据无效");
  398. throw new BusinessException(ResultCode.ERROR.getCode(), "根据ID查询的问答数据无效");
  399. }
  400. return communityEncyclopediaQuestionAnswer;
  401. }
  402. /**
  403. * 查询有效的百科问答列表
  404. */
  405. @PostMapping("/findValidListByType")
  406. @ApiOperation(value = "查询有效的百科问答列表")
  407. public ResultEntity<?> findValidList() throws BusinessException {
  408. try {
  409. // 根据问答ID查询到问答数据记录
  410. List<CommunityEncyclopediaQuestionAnswer> encyclopediaQuestionAnswerList = findValidListByType(ValidOrInvalidEnum.VALID.getValue());
  411. return ResultEntity.success(encyclopediaQuestionAnswerList);
  412. } catch (Exception e) {
  413. log.error("es服务异常,请联系管理员:{}", e.getMessage());
  414. return ResultEntity.error("es服务异常,请联系管理员");
  415. }
  416. }
  417. /**
  418. * 根据问答分类查询有效上线置顶的百科问答
  419. */
  420. @PostMapping("/findValidListByCategory")
  421. @ApiOperation(value = "根据问答分类查询有效上线置顶的百科问答")
  422. public ResultEntity<?> findValidListByCategory(@RequestBody CommunityEncyclopediaQuestionAnswerDTO dto) throws BusinessException {
  423. if (null == dto || null == dto.getCategory()) {
  424. log.error("百科问答类别不能为NULL");
  425. return ResultEntity.error("百科问答类别不能为NULL");
  426. }
  427. try {
  428. // 根据问答ID查询到问答数据记录
  429. List<CommunityEncyclopediaQuestionAnswer> questionAnswerList = findValidListByTypeAndStatusAndCategoryAndIsCategoryTop(ValidOrInvalidEnum.VALID.getValue(),
  430. YesOrNoEnum.YES.getValue(), dto.getCategory(), YesOrNoEnum.YES.getValue());
  431. return ResultEntity.success(questionAnswerList);
  432. } catch (Exception e) {
  433. log.error("es服务异常,请联系管理员:{}", e.getMessage());
  434. return ResultEntity.error("es服务异常,请联系管理员");
  435. }
  436. }
  437. /**
  438. * 查询有效上线推荐的百科问答
  439. */
  440. @PostMapping("/findValidListByRecommend")
  441. @ApiOperation(value = "查询有效上线推荐的百科问答")
  442. public ResultEntity<?> findValidListByRecommend() throws BusinessException {
  443. try {
  444. List<CommunityEncyclopediaQuestionAnswer> questionAnswerList =
  445. findValidListByTypeAndStatusAndIsHotRecommend(ValidOrInvalidEnum.VALID.getValue(),
  446. YesOrNoEnum.YES.getValue(), YesOrNoEnum.YES.getValue());
  447. return ResultEntity.success(questionAnswerList);
  448. } catch (Exception e) {
  449. log.error("es服务异常,请联系管理员:{}", e.getMessage());
  450. return ResultEntity.error("es服务异常,请联系管理员");
  451. }
  452. }
  453. /**
  454. * 查询有效上线关联产品productCode的百科问答
  455. */
  456. @PostMapping("/findValidListByProductCode")
  457. @ApiOperation(value = "查询有效上线关联产品productCode的百科问答")
  458. public ResultEntity<?> findValidListByProductCode(@RequestBody ProductLibraryDTO dto) throws BusinessException {
  459. try {
  460. List<CommunityEncyclopediaQuestionAnswer> questionAnswerList =
  461. findValidListByTypeAndStatusAndProductCode(ValidOrInvalidEnum.VALID.getValue(),
  462. YesOrNoEnum.YES.getValue(), dto.getProductCode());
  463. return ResultEntity.success(questionAnswerList);
  464. } catch (Exception e) {
  465. log.error("es服务异常,请联系管理员:{}", e.getMessage());
  466. return ResultEntity.error("es服务异常,请联系管理员");
  467. }
  468. }
  469. private boolean findValidOneByTypeAndStatusAndCategoryAndTopIndex(Integer type, Integer status, Integer category, Integer topIndex) throws IOException {
  470. return Objects.requireNonNull(elasticsearchClient.search(s -> s
  471. .index(CommonConstants.WIKI_INDEX)
  472. .query(q -> q
  473. .bool(b -> b
  474. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  475. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(status)))
  476. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_CATEGORY).value(category)))
  477. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TOP_INDEX).value(topIndex)))
  478. )
  479. ), CommunityEncyclopediaQuestionAnswer.class).hits().total()).value() > 0;
  480. }
  481. private boolean findValidOneByTypeAndStatusAndRecommendIndex(Integer type, Integer status, Integer recommendIndex) throws IOException {
  482. return Objects.requireNonNull(elasticsearchClient.search(s -> s
  483. .index(CommonConstants.WIKI_INDEX)
  484. .query(q -> q
  485. .bool(b -> b
  486. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  487. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(status)))
  488. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_RECOMMEND_INDEX).value(recommendIndex)))
  489. )
  490. ), CommunityEncyclopediaQuestionAnswer.class).hits().total()).value() > 0;
  491. }
  492. private List<CommunityEncyclopediaQuestionAnswer> findValidListByTypeAndCategory(Integer type, Integer category) throws IOException {
  493. return elasticsearchClient.search(s -> s
  494. .index(CommonConstants.WIKI_INDEX)
  495. .query(q -> q
  496. .bool(b -> b
  497. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  498. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_CATEGORY).value(category)))
  499. )
  500. ), CommunityEncyclopediaQuestionAnswer.class).hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  501. }
  502. private List<CommunityEncyclopediaQuestionAnswer> findValidListByType(Integer type) throws IOException {
  503. return elasticsearchClient.search(s -> s
  504. .index(CommonConstants.WIKI_INDEX)
  505. .query(q -> q
  506. .bool(b -> b
  507. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  508. )
  509. ), CommunityEncyclopediaQuestionAnswer.class).hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  510. }
  511. private List<CommunityEncyclopediaQuestionAnswer> findValidListByTypeAndStatusAndCategoryAndIsCategoryTop(Integer type, Integer status, Integer category, Integer categoryTop) throws IOException {
  512. return elasticsearchClient.search(s -> s
  513. .index(CommonConstants.WIKI_INDEX)
  514. .query(q -> q
  515. .bool(b -> b
  516. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  517. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(status)))
  518. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_CATEGORY).value(category)))
  519. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_IS_CATEGORY_TOP).value(categoryTop)))
  520. )
  521. ), CommunityEncyclopediaQuestionAnswer.class).hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  522. }
  523. private List<CommunityEncyclopediaQuestionAnswer> findValidListByTypeAndStatusAndIsHotRecommend(Integer type, Integer status, Integer isHotRecommend) throws IOException {
  524. return elasticsearchClient.search(s -> s
  525. .index(CommonConstants.WIKI_INDEX)
  526. .query(q -> q
  527. .bool(b -> b
  528. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  529. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(status)))
  530. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_IS_HOT_RECOMMEND).value(isHotRecommend)))
  531. )
  532. ), CommunityEncyclopediaQuestionAnswer.class).hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  533. }
  534. private List<CommunityEncyclopediaQuestionAnswer> findValidListByTypeAndStatusAndProductCode(Integer type, Integer status, String productCode) throws IOException {
  535. return elasticsearchClient.search(s -> s
  536. .index(CommonConstants.WIKI_INDEX)
  537. .query(q -> q
  538. .bool(b -> b
  539. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE).value(type)))
  540. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS).value(status)))
  541. .must(m -> m.term(t -> t.field(CommunityEncyclopediaQuestionAnswer.FIELD_PRODUCT_CODE).value(productCode)))
  542. )
  543. ), CommunityEncyclopediaQuestionAnswer.class).hits().hits().stream().map(Hit::source).collect(Collectors.toList());
  544. }
  545. private void createIndexDetail() throws IOException {
  546. CreateIndexResponse response = elasticsearchClient
  547. .indices()
  548. .create(c -> c
  549. .index(CommonConstants.WIKI_INDEX)
  550. .settings(sBuilder -> sBuilder
  551. .index(iBuilder -> iBuilder
  552. // 三个分片
  553. .numberOfShards("5")
  554. // 一个副本
  555. .numberOfReplicas("1")))
  556. .mappings(mBuilder -> mBuilder
  557. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_ID, pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256)))
  558. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_QUESTION, pBuilder -> pBuilder.text(tBuilder -> tBuilder.analyzer("ik_max_word").searchAnalyzer("ik_smart")))
  559. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_MANAGER_QUESTION, pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256)))
  560. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_ANSWER, pBuilder -> pBuilder.text(tBuilder -> tBuilder.analyzer("ik_max_word").searchAnalyzer("ik_smart")))
  561. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_CATEGORY, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  562. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_STATUS, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  563. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_NORMAL_INDEX, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  564. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_IS_CATEGORY_TOP, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  565. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_TOP_INDEX, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  566. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_IS_HOT_RECOMMEND, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  567. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_IS_HOT_RECOMMEND_TOP, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  568. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_RECOMMEND_INDEX, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  569. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_TYPE, pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder))
  570. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_OPERATE_REAL_NAME, pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256)))
  571. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_OPERATE_DJ_CODE, pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256)))
  572. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_CREATE_TIME, pBuilder -> pBuilder.date(datePropertyBuilder -> datePropertyBuilder.format("yyyy-MM-dd HH:mm:ss")))
  573. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_UPDATE_TIME, pBuilder -> pBuilder.date(datePropertyBuilder -> datePropertyBuilder.format("yyyy-MM-dd HH:mm:ss")))
  574. .properties(CommunityEncyclopediaQuestionAnswer.FIELD_PRODUCT_CODE, pBuilder -> pBuilder.keyword(keywordPropertyBuilder -> keywordPropertyBuilder.ignoreAbove(256)))
  575. )
  576. );
  577. log.info("createIndexDetail方法,acknowledged={}", response.acknowledged());
  578. }
  579. }

es8代码升级遇到的问题

pom.xml 关于 jackson 的版本不兼容问题

引入依赖后服务无法正常启动

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. <version>2.13.3</version>
  5. </dependency>

通过mvn命名进行排查

mvn dependency:tree

观察依赖树发现,可能是 jackson 2.13版本下其他依赖可能和已有版本存在冲突

解决方案

将 依赖保持和已有依赖一致

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.core</groupId>
  3. <artifactId>jackson-databind</artifactId>
  4. <version>2.11.4</version>
  5. </dependency>
ElasticSearch查询报错JsonpMappingException

说明某些字段无法转换,根据报错信息排查

重新导入es数据,删除version、timestamp 两个额外字段即可

或者使用兜底方案(不一定成):

在实体类上加注解

@JsonIgnoreProperties(ignoreUnknown=true)

ElasticSearch查询报错JsonpMappingException

WildcardQuery 查询不到数据
  1. Query wildcardQuery = WildcardQuery.of(w -> w.field("managerQuestion").value("*" + questionAnswerDTO.getQuestion() + "*"))._toQuery();
  2. SearchResponse<CommunityEncyclopediaQuestionAnswer> search = elasticsearchClient.search(s -> s
  3. .index("community_encyclopedia")
  4. .query(q -> q
  5. .bool(b -> b
  6. .must(type)
  7. .must(wildcardQuery)
  8. )
  9. ), CommunityEncyclopediaQuestionAnswer.class);

将字段 managerQuestion 改为 managerQuestion.keyword

查询后Date类型 的 @JsonFormat 未生效

待解决

es间数据迁移

迁移方式对比

参考链接:三种常用的 Elasticsearch 数据迁移方案

es6.8到7.1.1

采用方式

ElasticSearch-dump

安装

  1. npm install elasticdump -g
  2. 在 D:\environment\nodejs\node_global\node_modules\elasticdump\bin 目录下运行命令

相关命令

  1. 在线迁移单个索引
  2. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/community_encyclopedia --output=http://localhost:9200/community_encyclopedia --type=mapping
  3. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/community_encyclopedia --output=http://localhost:9200/community_encyclopedia --type=data
  4. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/orgnization --output=http://localhost:9200/orgnization --type=mapping
  5. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/orgnization --output=http://localhost:9200/orgnization --type=data
  6. elasticdump --input=http://localhost:9200/dragon --output=http://"elastic:pB0@uI"@10.221.50.102:9200/dragon --type=mapping
  7. elasticdump --input=http://localhost:9200/dragon --output=http://"elastic:pB0@uI"@10.221.50.102:9200/dragon --type=data
  8. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/community_encyclopedia --output=http://"elastic:pB0@uI"@localhost:9200/community_encyclopedia --type=mapping
  9. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/community_encyclopedia --output=https://"elastic:pB0@uI"@localhost:9200/community_encyclopedia --type=data
  10. elasticdump --input=http://"elastic:pB0@uI"@10.221.50.102:9200/community_encyclopedia --output=http://"elastic:pB0@uI"@10.7.176.72:9200/community_encyclopedia --type=mapping
  11. http://10.7.176.72:9200/
  12. dragon
  13. 离线迁移全部索引(需要手动创建目录) -- 未测试
  14. # 导出
  15. multielasticdump --direction=dump --match='^.*$' --input=http://"elastic:pB0@uI"@10.221.50.102:9200 --output=/tmp/es_backup --includeType='data,mapping' --limit=2000
  16. # 导入
  17. multielasticdump --direction=load --match='^.*$' --input=/tmp/es_backup --output=http://localhost:9200 --includeType='data,mapping' --limit=2000
  18. multielasticdump --direction=dump --match='^.*$' --input=http://"elastic:pB0@uI"@10.221.50.102:9200 --output=E:\file\back_test --includeType='data,mapping' --limit=2000
  19. multielasticdump --direction=load --match='^.*$' --input=E:\file\back_test --output=http://localhost:9200 --includeType='data,mapping' --limit=2000
参数含义
  • --input: 源地址,可为 ES 集群 URL、文件或 stdin,可指定索引,格式为:{protocol}://{host}:{port}/{index}--input-index: 源 ES 集群中的索引
  • --output: 目标地址,可为 ES 集群地址 URL、文件或 stdout,可指定索引,格式为:{protocol}://{host}:{port}/{index}
  • --output-index: 目标 ES 集群的索引
  • --type: 迁移类型,默认为 data,表明只迁移数据,可选 settings, analyzer, data, mapping, alias

参考链接

elasticsearch-dump工具:GitHub - elasticsearch-dump/elasticsearch-dump: Import and export tools for elasticsearch & opensearch

使用该工具迁移文档:使用ElasticSearch-dump进行数据迁移、备份_elasticsearchdump 大于4g_刘李404not found的博客-CSDN博客

es6.8 到 8.7.0(本地)

采用方式

ElasticSearch-dump(需要配置安全证书,暂时不采用这种方式)

logstash

启动方式

在logstash新建 mysql/logstash-es.conf 文件

然后用一下命令启动

bin\logstash.bat -f mysql\logstash-es.conf

配置文件

  1. #logstash输入配置
  2. input {
  3. elasticsearch {
  4. hosts => ["10.221.50.102:9200"]
  5. index => "community_encyclopedia"
  6. user => "elastic"
  7. password => "pB0@uI"
  8. #设置为true,将会提取ES文档的元数据信息,例如indextype和id。貌似没用
  9. # docinfo => true
  10. }
  11. }
  12. #logstash输出配置
  13. output {
  14. # stdout { codec => json_lines}
  15. stdout { codec => rubydebug}
  16. elasticsearch {
  17. hosts => ["localhost:9200"]
  18. user => "elastic"
  19. password => "pB0@uI"
  20. ssl => true
  21. ssl_certificate_verification => false
  22. index => "community_encyclopedia"
  23. document_id => "%{id}"
  24. }
  25. }

es6.8 到 8.7.0(测试环境)

特别说明

为保证es中索引结构相同,请先在代码端/页面 运行下保存相关接口,在es中创建索引后再进行数据迁移

采用方式

ElasticSearch-dump(需要配置安全证书,暂时不采用这种方式)

logstash

启动方式

在logstash新建 mysql/logstash-es.conf 文件

然后用一下命令启动

  1. bin\logstash.bat -f mysql\logstash-es.conf
  2. [linux]:
  3. sudo ./bin/logstash -f mysql_test/logstash-es.conf
  4. # sudo 加不加都可
  5. ./bin/logstash -f extend/job/logstash-t_settle_order_info.conf

配置文件

  1. #logstash输入配置
  2. input {
  3. elasticsearch {
  4. hosts => ["10.221.50.102:9200"]
  5. index => "community_encyclopedia"
  6. user => "elastic"
  7. password => "pB0@uI"
  8. #设置为true,将会提取ES文档的元数据信息,例如indextype和id。貌似没用
  9. # docinfo => true
  10. }
  11. }
  12. filter {
  13. mutate {
  14. remove_field => ["@version", "@timestamp"]
  15. }
  16. }
  17. #logstash输出配置
  18. output {
  19. # stdout { codec => json_lines}
  20. stdout { codec => rubydebug}
  21. elasticsearch {
  22. # hosts => ["10.7.176.72:9200"]
  23. hosts => ["10.7.176.72:9200","10.7.176.73:9200","10.7.176.74:9200"]
  24. # hosts => ["10.7.176.72:9300"]
  25. user => "elastic"
  26. # password => "pB0@uI"
  27. password => "Mvwm@n12nal"
  28. # ssl => true
  29. # ssl_certificate_verification => false
  30. index => "community_encyclopedia"
  31. document_id => "%{id}"
  32. }
  33. }

mysql到es数据迁移

订单历史数据从mysql迁移至es(单表测试)

搭伙app订单查询接口分析

  1. url:https://agentd.djbx.com/order/orderInfo/orderInfoList
  2. method:POST
  3. requestTime:2023.06.28-14:02:56:820
  4. responseTime:2023.06.28-14:02:57:31
  5. duration:211ms
  6. body:{
  7. "pageNo": "1",
  8. "pageSize": "10",
  9. "param": {
  10. "memberId": "4OnzMs73",
  11. "productType": [],
  12. "status": "3",
  13. "policyStatus": null,
  14. "startDate": null,
  15. "endDate": null,
  16. "appntName": null,
  17. "lcnNo": null
  18. }
  19. }
  20. params:{}
  21. header:{content-type: application/json; charset=utf-8, systemType: android, systemVersion: 9, deviceID: PQ3A.190705.003, bundleId: com.djcx.dahuo, appVersion: 4.3.10, phoneModel: Xiaomi 2203121C, token: 7pyvLZQfD6UUP0d4yPTgkXTgdq8, content-length: 174}

采用方式

logstash

启动运行

在【程序目录】目录执行以下命令启动:

  1. 【windows】bin\logstash.bat -f mysql\logstash-db-sync.conf
  2. 【linux】nohup ./bin/logstash -f mysql_test/logstash-es.conf & (后台)
  3. sudo ./bin/logstash -f mysql_test/logstash-es.conf

需求

从mysql:dh.order 的 t_car_order 中迁移已支付和已取消的订单到es

条件:ORDER_STATUS in (3,4)

数据量

一个月30w,一年300w(占用磁盘大约1GB)

索引类创建方案

按月分索引:数据量太少,不建议

按年划分索引:感觉可行,试一下

都试一下,看看查询效率,删除效率如何(什么情况下会删除数据呢?)

logstash相关配置

  1. #logstash输入配置
  2. input {
  3. #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  4. jdbc {
  5. type => "jdbc"
  6. jdbc_connection_string => "jdbc:mysql://10.221.50.106:3306/dh_order?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false"
  7. # 数据库连接账号密码;
  8. jdbc_user => "dh_test"
  9. jdbc_password => "Y2017dh123"
  10. # MySQL依赖包路径;
  11. jdbc_driver_library => "mysql/mysql-connector-java-5.1.49.jar"
  12. jdbc_driver_class => "com.mysql.jdbc.Driver"
  13. # 数据库重连尝试次数
  14. connection_retry_attempts => "3"
  15. # 判断数据库连接是否可用,默认false不开启
  16. jdbc_validate_connection => "true"
  17. # 数据库连接可用校验超时时间,默认3600S
  18. jdbc_validation_timeout => "3600"
  19. # 是否开启分页
  20. jdbc_paging_enabled => true
  21. # statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  22. # statement => "SELECT * FROM `t_car_order`"
  23. # statement => "SELECT DATE_FORMAT(tco.create_time, '%Y.%m') index_date, tco.* FROM t_car_order tco limit 200"
  24. statement => "SELECT DATE_FORMAT(tco.create_time, '%Y') index_year, tco.* FROM t_car_order tco"
  25. # statement => "SELECT ID, order_no, order_type, MEMBER_ID, owner_id, team_id, CHANNEL_CODE, pay_type, PAY_NO, TOTAL_AMOUNT, PAY_AMOUNT, DISCOUNT_AMOUNT, DISCOUNT_RATE, ROLE_TYPE, PRODUCT_VALUE, ORDER_STATUS, SUB_ORDER_STATUS, step_url, total_annual_prem, paidTime, subject, transfer, identity, msg, date_format(update_time,'%Y-%m-%d %H:%i:%s') update_time, date_format(create_time,'%Y-%m-%d %H:%i:%s') create_time, uuid, renewal_redis_key, clue_id, union_type, system_type, operation_code, cx_order_no, fin_typ
  26. # FROM t_car_order limit 10"
  27. # statement => "SELECT * FROM t_car_order limit 1"
  28. # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
  29. # lowercase_column_names => false
  30. # Value can be any of: fatal,error,warn,info,debug,默认info;
  31. # sql_log_level => warn
  32. sql_log_level => debug
  33. # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
  34. # record_last_run => true
  35. # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
  36. # use_column_value => true
  37. # 需要记录的字段,用于增量同步,需是数据库字段
  38. # tracking_column => "ModifyTime"
  39. # Value can be any of: numeric,timestamp,Default value is "numeric"
  40. # tracking_column_type => timestamp
  41. # record_last_run上次数据存放位置;
  42. # last_run_metadata_path => "mysql/last_id.txt"
  43. # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
  44. # clean_run => false
  45. # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
  46. # schedule => "*/5 * * * * *"
  47. # 用来控制增量更新的字段,一般是自增id或者创建、更新时间,注意这里要采用sql语句中select采用的字段别名
  48. # tracking_column => "unix_ts_in_secs"
  49. # tracking_column 对应字段的类型
  50. # tracking_column_type => "numeric"
  51. # timezone => "Asia/Shanghai" # 你的时区
  52. }
  53. }
  54. #logstash输入数据的字段匹配和数据过滤
  55. # filter {
  56. # mutate {
  57. # copy => { "id" => "[@metadata][_id]"}
  58. # remove_field => ["id", "@version", "unix_ts_in_secs"]
  59. # }
  60. # }
  61. filter {
  62. # date {
  63. # match => ["update_time", "yyyy-MM-dd HH:mm:ss"]
  64. # target => "update_time"
  65. # }
  66. # grok {
  67. # match => {"message" => "(?<create_time>(?:%{YEAR}-%{MONTHNUM2}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}))"}
  68. # }
  69. # mutate {
  70. # add_field => {"temp_ts" => "%{create_time}"}
  71. # }
  72. # date {
  73. # match => ["temp_ts", "yyyy-MM-dd HH:mm:ss"]
  74. # target => "@timestamp"
  75. # }
  76. # date {
  77. # match => ["create_time", "yyyy-MM-dd HH:mm:ss"]
  78. # target => "create_time"
  79. # }
  80. # ruby {
  81. # code => 'event.set("create_time", event.get("create_time").strftime("%Y-%m-%d %H:%M:%S"))'
  82. # }
  83. # date {
  84. # # match => ["create_time", "yyyy-MM-dd HH:mm:ss"]
  85. # match => ["create_time", "ISO8601"]
  86. # target => "create_time"
  87. # timezone => "Asia/Shanghai" # 你的时区
  88. # # timezone => "America/New_York" # 你的时区
  89. # }
  90. # mutate {
  91. # add_field => { "index_date" => "%{create_time}" }
  92. # }
  93. # mutate {
  94. # rename => { "create_time_string" => "index_date" }
  95. # }
  96. # date {
  97. # # match => ["index_date", "ISO8601"]
  98. # match => ["index_date", "ISO8601"]
  99. # # target => "index_date"
  100. # }
  101. # }
  102. # date {
  103. # match => ["index_date", "yyyy-MM-dd HH:mm:ss"]
  104. # # target => "index_date"
  105. # # target => "index_date"
  106. # }
  107. # mutate {
  108. # add_field => {
  109. # "index_date1" => "%{index_date}"
  110. # }
  111. # mutate {
  112. # remove_field => ["@version","index_date"]
  113. # }
  114. ruby {
  115. code => "event.set('@timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  116. }
  117. ruby {
  118. code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)"
  119. }
  120. ruby {
  121. code => "event.set('create_time', event.get('create_time').time.localtime + 8*60*60)"
  122. }
  123. # ruby {
  124. # code => "event.set('create_time',event.get('timestamp'))"
  125. # }
  126. # mutate {
  127. # add_field => { "[@metadata][index_date]" => "%{index_date}" }
  128. # remove_field => ["@version", "index_date"]
  129. # }
  130. mutate {
  131. add_field => { "[@metadata][index_year]" => "%{index_year}" }
  132. remove_field => ["@version", "index_year"]
  133. }
  134. }
  135. #logstash输出配置
  136. output {
  137. # 采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  138. # stdout { codec => json_lines}
  139. stdout { codec => rubydebug}
  140. # 指定输出到ES的具体索引
  141. # elasticsearch {
  142. # index => "rdbms_sync_idx"
  143. # document_id => "%{[@metadata][_id]}"
  144. # }
  145. elasticsearch {
  146. # host => "192.168.1.1"
  147. # port => "9200"
  148. # 配置ES集群地址
  149. # hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
  150. hosts => ["localhost:9200"]
  151. # 索引名字,必须小写
  152. # index => "t_car_order-%{+YYYY.MM.dd}"
  153. # index => "t_car_order-%{index_date}"
  154. # index => "t_car_order-%{[@metadata][index_date]}"
  155. # index => "t_car_order_@timestamp"
  156. index => "t_car_order"
  157. # index => "t_car_order-%{[@metadata][index_year]}"
  158. # 数据唯一索引(建议使用数据库KeyID)
  159. # document_id => "%{KeyId}"
  160. document_id => "%{id}"
  161. # document_id => "ID"
  162. }
  163. }

参考链接

使用logstash同步MySQL数据到ES

使用logstash实现mysql到ES的数据迁移

通过Logstash实现mysql数据定时增量同步到ES

通过Logstash将RDS MySQL数据同步至Elasticsearch

日期插件语法(官网)

问题

logstash日期格式转换问题(暂未解决)

多个logstash版本测试

日期转换不可用,sql层面也不行,打算在接口层处理。

日期格式问题,暂缓。。。 心累了,估计是日期插件问题,全网没有找到解决方案,走sql修改吧

提了个bug:无法将date类型进行格式化操作 · Issue #158 · logstash-plugins/logstash-filter-date · GitHub

参考链接

Logstash:获取业务时间并替换@timestamp

Elasticsearch 滞后8个小时等时区问题,一网打尽!

logstash 7.x 中时间问题,@timestamp 与本地时间相差 8个小时

现有订单数据保存到es方案

修改保存接口,保存到mysql改为es,或者可以考虑消息队列中间件

订单历史数据从mysql迁移至es(实际业务场景)

背景1:由于无法在单sql中查询数据,因此无法使用logstash工具

背景2:订单迁移分为订单详情和订单列表两大类

背景3:订单信息涉及多张表,无法进行单表简单迁移

迁移方案

订单信息获取

search服务设置一个定时任务,通过feign调用car服务查询接口,进行全量和增量查询需要的订单信息

订单信息迁移至es

分两大类创建索引:订单详情和订单列表,每一类按照年维度进行索引构建,用别名进行查询

es结构按订单号为id,报文信息为message

定时任务详情

参数

订单创建日期开始日期,订单创建日期截止日期

默认参数:最近三个月

如何获取符合条件的增量数据的订单号?通过binlog日志

通过时间段(每三个月跑一次)

参考链接

java与es8实战之六:用JSON创建请求对象(比builder pattern更加直观简洁)

参考文档

Elasticsearch7.x中文文档

es7.0版本升级变化(官网)

ELK Stack

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/923644
推荐阅读
相关标签
  

闽ICP备14008679号