赞
踩
目录
3、下载canal-1.1.5 和 elasticsearch-7.16.0
5.2 增加同步到es的表结构数据(此处的字段需要与es中mapping能对应上)
6.2 修改elastic 配置文件 elasticsearch.yml
6.5、使用canal同步数据到es前,需要先在es中增加mapping结构映射
6.6、为了保证数据的同步,最好加上手动同步和定时校验的机制。
日常开发过程中,我们有80%的时间都是使用查询操作,把所有的压力都放在mysql slave上也会存在访问瓶颈,所以我们选用elasticsearch 来查询数据,如何保证es和mysql之间的数据同步呢。
普通的做法是在程序中将 mysql 发生变化的数据 进行监听,将有发生变化的数据 增加到es中。这种做法可取,并且利用模型监听的方式 也可以实现和业务代码分离。
第二种做法是使用阿里自研的canal,canal是模拟mysql slave 来拉取mysql的binglog 日志,解析出日志中的sql 变更同步到es中。
- grant REPLICATION CLIENT on *.* to canal@"172.31.%" IDENTIFIED BY "canal";
- grant REPLICATION SLAVE on *.* to canal@"172.31.%" IDENTIFIED BY "canal";
- groupadd canal
- useradd -g canal canal
- mkdir -p /data/canal/
- cd /data/canal/
- mkdir canal.deployer-1.1.5
- mkdir canal.adapter-1.1.5
- mkdir canal.adapter-1.1.5-SNAPSHOT
- chown -R canal.canal /data/canal/
- su canal
简介:
canal.deployer-1.1.5.tar.gz对应的是canal的server端,负责订阅并解析Mysql-Binlog
canal.adapter-1.1.5.tar.gz对应的是适配器,负责将server的binlog转换并发送给对应的应用
canal.admin-1.1.5.tar.gz一个可视化webui可以不安装
额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug,无法insert数据到es)
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
- wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.0-linux-x86_64.tar.gz
- tar zxf canal.deployer-1.1.5.tar.gz -C /data/canal/canal.deployer-1.1.5/
- tar zxf canal.adapter-1.1.5.tar.gz -C /data/canal/canal.adapter-1.1.5/
- tar zxf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C /data/canal/canal.adapter-1.1.5-SNAPSHOT/
- tar zxf elasticsearch-7.16.0-linux-x86_64.tar.gz -C /data/canal/
- cd /data/canal/canal.adapter-1.1.5-SNAPSHOT/plugin/
- mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
- cp client-adapter.es7x-1.1.5-jar-with-dependencies.jar /data/canal/canal.adapter-1.1.5/plugin/
- cd /data/canal/canal.deployer-1.1.5/conf/example/
- cp instance.properties instance.properties.bak
- vim instance.properties
- #position info
- #mysql数据库地址
- canal.instance.master.address=127.0.0.1:3306
-
- #username/password mysql 数据库账号和密码
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
-
- #table regex
- #canal.instance.filter.regex=.\..
- #配置需要同步binlog日志的数据库和表
- canal.instance.filter.regex=库1\\.order.*,库2\\.log.*,库3\\.user.*
-
- #table black regex
- canal.instance.filter.black.regex=mysql\.slave_.*
指定全库全表: .*\\..*
指定库全表 canal.instance.filter.regex. 库名..* test\..*
单表 库名.表名 test.user
多规则组合使用
库名1..*,库名2.表名1,库名3.表名2 (逗号分隔)
test\..*,test2.user1,test3.user2 (逗号分隔)
- cd /data/canal/canal.deployer-1.1.5/
- sh bin/startup.sh && tail -f logs/example/meta.log
- cd /data/canal/canal.adapter-1.1.5/conf
- cp application.yml application.yml.bak
- vim application.yml
#配置mysql数据
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/user?useUnicode=true
username: canal
password: canal
- name: es7
key: eskey #es全量更新时的key
hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode #estcp
properties:
mode: transport #transport # or rest
# security.auth: test:123456 # only used for rest mode
#es集群名称
cluster.name: elasticsearch
dataSourceKey: defaultDS
outerAdapterKey: eskey
destination: example
groupId: g1
esMapping:
_index: 索引名称
_type: order
_id: _id
upsert: true
#relations:
# dc_order:
# name: order
sql: "SELECT o.id AS _id,o.user_id,o.product_id,o.product_name,o.updated_at FROM order o"
etlCondition: "where o.updated_at>={}"
commitBatch: 3000
- cd /data/canal/canal.adapter-1.1.5/
- sh bin/startup.sh
- echo "vm.max_map_count=262144" >> /etc/sysctl.conf
- sysctl -p
不同服务器的es集群配置
(master) 节点增加如下配置
- node.name: node-1
- cluster.initial_master_nodes: ["node-1"]
- network.host: 0.0.0.0
- network.publish_host: 0.0.0.0
- http.port: 9200
- transport.tcp.port: 9300
- #如果新增服务器,在seed_hosts中增加ip
- discovery.seed_hosts: ["171.26.132.2", "171.26.132.5"]
- #禁止外网下载坐标
- ingest.geoip.downloader.enabled: false
(slave) 节点增加如下配置
- node.name: node-2
- #cluster.initial_master_nodes: ["node-2"]
- network.host: 0.0.0.0
- network.publish_host: 0.0.0.0
- http.port: 9200
- transport.tcp.port: 9300
- #如果新增服务器,在seed_hosts中增加ip
- discovery.seed_hosts: ["171.26.132.2", "171.26.132.5"]
- ingest.geoip.downloader.enabled: false
同一台服务器启动多个es实例配置
(master) 配置
- node.name: node-1
- cluster.initial_master_nodes: ["node-1"]
- network.host: 0.0.0.0
- network.publish_host: 0.0.0.0
- http.port: 9200
- transport.tcp.port: 9300
- discovery.seed_hosts: ["127.0.0.1", "127.0.0.1"]
(slave) 配置
- node.name: node-2
- #cluster.initial_master_nodes: ["node-1"]
- network.host: 0.0.0.0
- network.publish_host: 0.0.0.0
- #http.port: 9200
- #transport.tcp.port: 9300
- discovery.seed_hosts: ["127.0.0.1", "127.0.0.1"]
es 设置密码(内网可以不设置) 设置es密码
- cd /data/canal/elasticsearch-7.16.0/
- #加参数 -d 表示守护模式
- ./bin/elasticsearch -d
- Order初始化: curl --request POST --url http://localhost:8081/etl/es7/eskey/order.yml --form 'params=0' #eskey 为canal.adapter-1.1.5/application.yml 中canalAdapters: 下的 key:
- 查看全部索引: curl http://127.0.0.1:9200/_cat/indices
- 删除索引: curl -X DELETE http://127.0.0.1:9200/索引?pretty
- 查询映射: curl -X GET http://127.0.0.1:9200/索引/_mapping?pretty
- 查询索引数据量:curl http://127.0.0.1:9200/_count?pretty
- 查询单个id:curl http://127.0.0.1:9200/索引/type/_search?pretty -X GET -H 'Content-Type:application/json' -d '{"query":{"match":{"_id":"16528875759937320"}}}'
映射的结构可以与数据表一致, 也可以只是部分字段。
curl -X PUT http://127.0.0.1:9200/索引名称?include_type_name=true -H 'Content-Type:application/json' -d '{"mappings":{"类型名称":{"properties":{"id":{"type":"long"},","user_id":{"type":"long"},"user_name":{"type":"keyword"},"order_no":{"type":"keyword"},"product_id":{"type":"integer"},"order_status":{"type":"integer"},"status":{"type":"integer"},"created_at":{"type":"integer"},"updated_at":{"type":"integer"},"created_by":{"type":"keyword"},"updated_by":{"type":"keyword"}}}}}'
6.6.1 每隔2分钟增量同步前15分钟的数据
curl --request POST --url http://localhost:8081/etl/es7/eskey/order.yml --form "params=0"
6.6.2 定时校验机制
每隔一段时间,将数据库和es的数据全量对比,找出es中缺失的数据并重新添加进去。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。