当前位置:   article > 正文

canal 同步mysql binlog 日志到 elasticsearch7_用binglog实现es同步

用binglog实现es同步

目录

一、安装 canal 和 elastic

1、mysql数据库赋予可读binlog 权限

2、在服务器创建 canal 用户并新建相应目录

3、下载canal-1.1.5 和 elasticsearch-7.16.0

4、修改deployer配置

4.1 配置修改如下:

4.2 启动deployer

5. 修改adapter 配置

5.1 配置修改如下 

5.2 增加同步到es的表结构数据(此处的字段需要与es中mapping能对应上)

5.3 启动adapter

6、安装es

6.1 服务器配置,设置vm.max_map_count。

6.2 修改elastic 配置文件 elasticsearch.yml

6.3 启动es

6.4、es索引基本操作

6.5、使用canal同步数据到es前,需要先在es中增加mapping结构映射

6.6、为了保证数据的同步,最好加上手动同步和定时校验的机制。


      日常开发过程中,我们有80%的时间都是使用查询操作,把所有的压力都放在mysql slave上也会存在访问瓶颈,所以我们选用elasticsearch 来查询数据,如何保证es和mysql之间的数据同步呢。

        普通的做法是在程序中将 mysql 发生变化的数据 进行监听,将有发生变化的数据 增加到es中。这种做法可取,并且利用模型监听的方式 也可以实现和业务代码分离。

        第二种做法是使用阿里自研的canal,canal是模拟mysql slave 来拉取mysql的binglog 日志,解析出日志中的sql 变更同步到es中。

一、安装 canal 和 elastic

1、mysql数据库赋予可读binlog 权限

  1. grant REPLICATION CLIENT on *.* to canal@"172.31.%" IDENTIFIED BY "canal";
  2. grant REPLICATION SLAVE on *.* to canal@"172.31.%" IDENTIFIED BY "canal";

2、在服务器创建 canal 用户并新建相应目录

  1. groupadd canal
  2. useradd -g canal canal
  3. mkdir -p /data/canal/
  4. cd /data/canal/
  5. mkdir canal.deployer-1.1.5
  6. mkdir canal.adapter-1.1.5
  7. mkdir canal.adapter-1.1.5-SNAPSHOT
  8. chown -R canal.canal /data/canal/
  9. su canal

3、下载canal-1.1.5 和 elasticsearch-7.16.0

简介:

canal.deployer-1.1.5.tar.gz对应的是canal的server端,负责订阅并解析Mysql-Binlog

canal.adapter-1.1.5.tar.gz对应的是适配器,负责将server的binlog转换并发送给对应的应用

canal.admin-1.1.5.tar.gz一个可视化webui可以不安装

额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug,无法insert数据到es)

  1. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  2. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
  3. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
  4. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.0-linux-x86_64.tar.gz
  5. tar zxf canal.deployer-1.1.5.tar.gz -C /data/canal/canal.deployer-1.1.5/
  6. tar zxf canal.adapter-1.1.5.tar.gz -C /data/canal/canal.adapter-1.1.5/
  7. tar zxf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C /data/canal/canal.adapter-1.1.5-SNAPSHOT/
  8. tar zxf elasticsearch-7.16.0-linux-x86_64.tar.gz -C /data/canal/
  9. cd /data/canal/canal.adapter-1.1.5-SNAPSHOT/plugin/
  10. mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
  11. cp client-adapter.es7x-1.1.5-jar-with-dependencies.jar /data/canal/canal.adapter-1.1.5/plugin/

4、修改deployer配置

  1. cd /data/canal/canal.deployer-1.1.5/conf/example/
  2. cp instance.properties instance.properties.bak
  3. vim instance.properties

4.1 配置修改如下:

  1. #position info
  2. #mysql数据库地址
  3. canal.instance.master.address=127.0.0.1:3306
  4. #username/password mysql 数据库账号和密码
  5. canal.instance.dbUsername=canal
  6. canal.instance.dbPassword=canal
  7. #table regex
  8. #canal.instance.filter.regex=.\..
  9. #配置需要同步binlog日志的数据库和表
  10. canal.instance.filter.regex=库1\\.order.*,库2\\.log.*,库3\\.user.*
  11. #table black regex
  12. canal.instance.filter.black.regex=mysql\.slave_.*

指定全库全表: .*\\..*

指定库全表 canal.instance.filter.regex. 库名..* test\..*

单表 库名.表名 test.user

多规则组合使用

库名1..*,库名2.表名1,库名3.表名2 (逗号分隔)

test\..*,test2.user1,test3.user2 (逗号分隔)

4.2 启动deployer

  1. cd /data/canal/canal.deployer-1.1.5/
  2. sh bin/startup.sh && tail -f logs/example/meta.log

5. 修改adapter 配置

  1. cd /data/canal/canal.adapter-1.1.5/conf
  2. cp application.yml application.yml.bak
  3. vim application.yml

5.1 配置修改如下 

#配置mysql数据

srcDataSources:

  defaultDS:

  url: jdbc:mysql://127.0.0.1:3306/user?useUnicode=true

  username: canal

  password: canal

- name: es7

  key: eskey #es全量更新时的key

  hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode #estcp

  properties:

   mode: transport #transport # or rest

 # security.auth: test:123456 # only used for rest mode

    #es集群名称

   cluster.name: elasticsearch

5.2 增加同步到es的表结构数据(此处的字段需要与es中mapping能对应上)

dataSourceKey: defaultDS

outerAdapterKey: eskey

destination: example

groupId: g1

esMapping:

_index: 索引名称

_type: order

_id: _id

upsert: true

#relations:

# dc_order:

# name: order

sql: "SELECT o.id AS _id,o.user_id,o.product_id,o.product_name,o.updated_at FROM order o"

etlCondition: "where o.updated_at>={}"

commitBatch: 3000

5.3 启动adapter

  1. cd /data/canal/canal.adapter-1.1.5/
  2. sh bin/startup.sh

6、安装es

6.1 服务器配置,设置vm.max_map_count。

  1. echo "vm.max_map_count=262144" >> /etc/sysctl.conf
  2. sysctl -p

6.2 修改elastic 配置文件 elasticsearch.yml

不同服务器的es集群配置

(master) 节点增加如下配置

  1. node.name: node-1
  2. cluster.initial_master_nodes: ["node-1"]
  3. network.host: 0.0.0.0
  4. network.publish_host: 0.0.0.0
  5. http.port: 9200
  6. transport.tcp.port: 9300
  7. #如果新增服务器,在seed_hosts中增加ip
  8. discovery.seed_hosts: ["171.26.132.2", "171.26.132.5"]
  9. #禁止外网下载坐标
  10. ingest.geoip.downloader.enabled: false

(slave) 节点增加如下配置

  1. node.name: node-2
  2. #cluster.initial_master_nodes: ["node-2"]
  3. network.host: 0.0.0.0
  4. network.publish_host: 0.0.0.0
  5. http.port: 9200
  6. transport.tcp.port: 9300
  7. #如果新增服务器,在seed_hosts中增加ip
  8. discovery.seed_hosts: ["171.26.132.2", "171.26.132.5"]
  9. ingest.geoip.downloader.enabled: false

同一台服务器启动多个es实例配置

(master) 配置

  1. node.name: node-1
  2. cluster.initial_master_nodes: ["node-1"]
  3. network.host: 0.0.0.0
  4. network.publish_host: 0.0.0.0
  5. http.port: 9200
  6. transport.tcp.port: 9300
  7. discovery.seed_hosts: ["127.0.0.1", "127.0.0.1"]

(slave) 配置

  1. node.name: node-2
  2. #cluster.initial_master_nodes: ["node-1"]
  3. network.host: 0.0.0.0
  4. network.publish_host: 0.0.0.0
  5. #http.port: 9200
  6. #transport.tcp.port: 9300
  7. discovery.seed_hosts: ["127.0.0.1", "127.0.0.1"]

es 设置密码(内网可以不设置) 设置es密码

6.3 启动es

  1. cd /data/canal/elasticsearch-7.16.0/
  2. #加参数 -d 表示守护模式
  3. ./bin/elasticsearch -d

6.4、es索引基本操作

  1. Order初始化: curl --request POST --url http://localhost:8081/etl/es7/eskey/order.yml --form 'params=0' #eskey 为canal.adapter-1.1.5/application.yml 中canalAdapters: 下的 key:
  2. 查看全部索引: curl http://127.0.0.1:9200/_cat/indices
  3. 删除索引: curl -X DELETE http://127.0.0.1:9200/索引?pretty
  4. 查询映射: curl -X GET http://127.0.0.1:9200/索引/_mapping?pretty
  5. 查询索引数据量:curl http://127.0.0.1:9200/_count?pretty
  6. 查询单个id:curl http://127.0.0.1:9200/索引/type/_search?pretty -X GET -H 'Content-Type:application/json' -d '{"query":{"match":{"_id":"16528875759937320"}}}'

6.5、使用canal同步数据到es前,需要先在es中增加mapping结构映射

        映射的结构可以与数据表一致, 也可以只是部分字段。

curl -X PUT http://127.0.0.1:9200/索引名称?include_type_name=true -H 'Content-Type:application/json' -d '{"mappings":{"类型名称":{"properties":{"id":{"type":"long"},","user_id":{"type":"long"},"user_name":{"type":"keyword"},"order_no":{"type":"keyword"},"product_id":{"type":"integer"},"order_status":{"type":"integer"},"status":{"type":"integer"},"created_at":{"type":"integer"},"updated_at":{"type":"integer"},"created_by":{"type":"keyword"},"updated_by":{"type":"keyword"}}}}}'

6.6、为了保证数据的同步,最好加上手动同步和定时校验的机制。

6.6.1 每隔2分钟增量同步前15分钟的数据

curl --request POST --url http://localhost:8081/etl/es7/eskey/order.yml --form "params=0"

6.6.2 定时校验机制

每隔一段时间,将数据库和es的数据全量对比,找出es中缺失的数据并重新添加进去。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/830333
推荐阅读
相关标签
  

闽ICP备14008679号