当前位置:   article > 正文

Logstash同步mysql一对多数据到ES(踩坑日记系列)_logstash sql排序

logstash sql排序

场景:

Logstash 、Kibana、ES版本:6.3.1。

使用Logstash从mysql同步用户和用户所有的宠物到ES中。

希望的格式:

  1. "register_name": "孟林洁",
  2. "id": 80469531,
  3. "pets": [
  4. {
  5. "breed_name": "万能梗",
  6. "birthday": null,
  7. "pet_id": 999044,
  8. "name": "一只狗",
  9. "images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}",
  10. "breed_id": 130
  11. },
  12. {
  13. "breed_name": "万能梗",
  14. "birthday": null,
  15. "pet_id": 999097,
  16. "name": "一只狗2",
  17. "images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}",
  18. "breed_id": 130
  19. }
  20. ],
  21. "mobile": "*******",
  22. "avatar": null,
  23. "pet_list": [
  24. 999044,
  25. 999097
  26. ]

问题:

  1. logstash同步nested嵌套类型到ES中。
  2. logstash同步嵌套数组对象时,聚合过程中数据丢失(用户宠物会随机丢失,偶而数据不丢失)。
  3. logstash同步时少同步一条数据,在停止logstash服务时才进行同步
  4. (更新) mysql的多条数据同步到es只有一条

解决:

1、解决logstash同步nested嵌套类型到ES中

先创建索引,并且修改索引类型为nested

  1. 创建索引:PUT /user
  2. 修改索引映射:
  3. PUT /user/_mapping/doc
  4. {
  5. "doc": {
  6. "properties": {
  7. "avatar": {
  8. "type": "text"
  9. },
  10. "id": {
  11. "type": "long"
  12. },
  13. "mobile": {
  14. "type": "text"
  15. },
  16. "pets": {
  17. "type": "nested",
  18. "properties": {
  19. "birthday": {
  20. "type": "date"
  21. },
  22. "breed_id": {
  23. "type": "long"
  24. },
  25. "breed_name": {
  26. "type": "text",
  27. "analyzer": "ik_max_word",
  28. "search_analyzer": "ik_max_word"
  29. },
  30. "images": {
  31. "type": "text"
  32. },
  33. "name": {
  34. "type": "text",
  35. "analyzer": "ik_max_word",
  36. "search_analyzer": "ik_max_word"
  37. },
  38. "pet_id": {
  39. "type": "long"
  40. }
  41. }
  42. },
  43. "register_name": {
  44. "type": "text",
  45. "analyzer": "ik_max_word",
  46. "search_analyzer": "ik_max_word"
  47. }
  48. }
  49. }
  50. }

使用logstash的过滤器中aggregate插件进行数据聚合。

配置文件jdbc3.conf

  1. input {
  2. stdin {}
  3. jdbc {
  4. jdbc_driver_library => "../mysql-connector-java-6.0.6.jar"
  5. jdbc_driver_class => "com.mysql.jdbc.Driver"
  6. jdbc_connection_string => "jdbc:mysql://****.com:3306/food-dev"
  7. jdbc_user => "****"
  8. jdbc_password => "****"
  9. #jdbc_paging_enabled => "true"
  10. #jdbc_page_size => "50"
  11. clean_run => true
  12. use_column_value => true
  13. record_last_run => "true"
  14. tracking_column => "id"
  15. schedule => "*/1 * * * *"
  16. #last_run_metadata_path => "/Users/menglinjie/ES-node/testdata.text"
  17. statement => "select u.id,u.register_name,u.mobile,u.avatar,u.status,svp.id as pet_id,svp.name,svp.images,svp.gender,svp.birthday,pb.id as breed_id,pb.name as breed_name from user u left join store_vip_pet svp on svp.user_id = u.id and svp.pet_status = 1 left join pet_breed pb on svp.breed_id = pb.id order by u.id desc"
  18. }
  19. }
  20. filter {
  21. #这里做聚合
  22. aggregate {
  23. task_id => "%{id}"
  24. code => "
  25. map['id'] = event.get('id')
  26. map['register_name'] = event.get('register_name')
  27. map['mobile'] = event.get('mobile')
  28. map['avatar'] = event.get('avatar')
  29. map['pet_list'] ||=[]
  30. map['pets'] ||=[]
  31. if (event.get('pet_id') != nil)
  32. if !(map['pet_list'].include? event.get('pet_id'))
  33. map['pet_list'] << event.get('pet_id')
  34. map['pets'] << {
  35. 'pet_id' => event.get('pet_id'),
  36. 'name' => event.get('name'),
  37. 'images' => event.get('images'),
  38. 'breed_id' => event.get('breed_id'),
  39. 'breed_name' => event.get('breed_name'),
  40. 'birthday' => event.get('birthday')
  41. }
  42. end
  43. end
  44. event.cancel()
  45. "
  46. push_previous_map_as_event => true
  47. timeout => 5
  48. }
  49. json {
  50. source => "message"
  51. remove_field => ["message"]
  52. #remove_field => ["message", "type", "@timestamp", "@version"]
  53. }
  54. mutate {
  55. #将不需要的JSON字段过滤,且不会被存入 ES 中
  56. remove_field => ["tags", "@timestamp", "@version"]
  57. }
  58. }
  59. output {
  60. stdout {
  61. #codec => json_lines
  62. }
  63. elasticsearch {
  64. hosts => ["127.0.0.1:9200"]
  65. index => "user"
  66. document_id => "%{id}"
  67. }
  68. }

2、解决聚合过程中子数组对象丢失

刚开始考虑到是否是sql查询分页问题,导致多宠物没有一起聚合,而是分开聚合。但是通过看日志和数据发现每次丢失的数据不同,没有任何规律性。

随机性的问题让我想到多线程,然后查找logstash配置。在config/logstash.yml中有以下配置:

  1. # ------------ Pipeline Settings --------------
  2. #
  3. # The ID of the pipeline.
  4. # 管道id
  5. # pipeline.id: test1
  6. #
  7. # Set the number of workers that will, in parallel, execute the filters+outputs
  8. # stage of the pipeline.
  9. #
  10. # This defaults to the number of the host's CPU cores.
  11. #output 和 filter的线程数,默认是cpu核数
  12. # pipeline.workers: 1
  13. #
  14. # How many events to retrieve from inputs before sending to filters+workers
  15. #
  16. # pipeline.batch.size: 1000
  17. #
  18. # How long to wait in milliseconds while polling for the next event
  19. # before dispatching an undersized batch to filters+outputs
  20. #
  21. # pipeline.batch.delay: 50

问题定位:多线程跑聚合过程中,同一个用户的多个宠物可能被分配到不通过的线程,分别做不同的聚合,导致一个用户存在多条数据,分别拥有不同的宠物,然后多线程的进行输出到ES,ES保存过程中会把存在的数据给更新掉,这就是我的宠物丢失的原因,多线程分配的随机性导致数据也随机丢失。

尝试修改线程数,验证猜想是否正确。

指定配置文件运行会使logstash忽略logstash.yml配置。所以在logstash.yml指定配置文件,运行logstash时不用指定配置,logstash会自动寻找logstash.yml配置

# path.config: /Users/menglinjie/ES-node/logstash-6.3.1/conf.d

验证后确认猜想正确。

回想刚才把线程数设置为1,这样肯定会影响性能的吧,万一以后我有不需要聚合的的数据时完全可以多线程跑。Logstash提供的pipelines.yml可以配置多管道,使不同的同步任务绑定不同管道配置。

这里pipeline.workers: 4,pipeline.output.workers: 3,那么执行聚合的filter就是1,这样可以单线程聚合,多线程输出。

多个任务可以配置多个管道,pipeline.id标示管道唯一性。

  1. - pipeline.id: user_pipeline
  2. pipeline.workers: 4
  3. pipeline.batch.size: 1000
  4. # 输出
  5. pipeline.output.workers: 3
  6. # 配置文件位置
  7. path.config: "/Users/menglinjie/ES-node/logstash-6.3.1/conf.d/*.conf"
  8. # 对基于磁盘的排队进行“持久化”。默认值是内存
  9. queue.type: persisted

更新:

影响聚合结果的还有sql语句!! 

sql语句必须根据聚合task_id排序,也就是需要聚合的数据必须排在一起。否则map['pets']会被覆盖掉,导致数据丢失。

3、logstash同步时少同步一条数据,在停止logstash服务时才进行同步

在filter 聚合配置中添加:

timeout => 3

filter aggregate 创建中 event map 并不知道我、这次事件是不是应该结束,也就是它也不知道到那一条才是最后一条, 因此设置一个 timeout 告诉它这个时间执行多少秒就结束继续执行第二个。但这样并不是很严谨,因为你也不确定你的 event map 到底要执行多久 。最好的方式是 我们应该给定一个 task end 的条件 ES官网关于 aggregate 的说明

4、es 配置id的问题,必须有唯一性,否则被覆盖

参考链接:

https://segmentfault.com/a/1190000016592277

https://segmentfault.com/q/1010000016861266

https://blog.csdn.net/weixin_33910460/article/details/88719101

https://elasticsearch.cn/question/6648

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/312725
推荐阅读
相关标签
  

闽ICP备14008679号