赞
踩
目录
3.3. CanalLogPositionManager 设计
Canal 是阿里开源的一款基于 MySql 数据库 binlog 的增量订阅和消费组件,通过它可以订阅数据库的 binlog 日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。
Canal 主要用途是对 MySql 数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对 MySql 的增量数据进行实时同步,支持同步到 MySql、ElasticSearch、HBase 等数据存储中去。
由上面两张图片可知:
1. 同步缓存 Redis/全文搜索 ES:Canal 一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过 binlog 进行缓存/ES 的增量更新。当缓存/ES 更新出现问题时,应该回退 binlog 到过去某个位置进行重新同步,并提供全量刷新缓存/ES 的方法。
2. 下发任务:另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入 MQ(比如 Kafka) 进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过 MQ 发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发 MQ 的代码,从而实现了下发归集。
3. 数据异构:在大型网站架构中,DB 都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。所谓的数据异构,那就是将需要 join 查询的多表按照某一个维度又聚合在一个DB 中,让你去查询。Canal 就是实现数据异构的手段之一。
Server 代表一个 Canal 运行实例,对应于一个 JVM。
Instance 对应于一个数据队列(1个 Canal Server 对应 1..n 个 Instance),Instance 下的子模块:
整体类图设计:
每个 EventParser 都会关联两个内部组件:
目前开源版本只支持 MySql binlog , 默认通过 MySql binlog dump 远程获取 binlog,但也可以使用 LocalBinlog - 类 relay log 模式,直接消费本地文件中的 binlog。
数据 1:n 业务:
为了合理的利用数据库资源, 一般常见的业务都是按照 schema 进行隔离,然后在 MySql 上层或者 dao 这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过 cobar/tddl 来解决数据源路由问题。所以,一般一个数据库实例上,会部署多个 schema,每个 schema 会有1个或者多个业务方关注。
数据 n:1 业务:
同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个 Store 进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局Id 进行排序归并。
1. 抽象 CanalStoreScavenge , 解决数据的清理,比如定时清理,满了之后清理,每次 ack 清理等。
2. CanalEventStore 接口,主要包含 put/get/ack/rollback 的相关接口。put/get 操作会组成一个生产者/消费者模式,每个 Store 都会有存储大小设计,存储满了,put 操作会阻塞等待 get 获取数据,所以不会无线占用存储,比如内存大小:
在了解具体 API 之前,需要提前了解下 Canal Client 的类设计,这样才可以正确的使用好 Canal。
大致分为几部分:
get/ack/rollback 协议介绍:
Canal 的 get/ack/rollback 协议和常规的 jms 协议有所不同,允许 get/ack 异步处理,比如可以连续调用 get 多次,后续异步按顺序提交 ack/rollback,项目中称之为流式 API。
流式 API 设计的好处:
流式 API 设计:
流式 API 带来的异步响应模型:
Canal 配置方式有两种:
Spring 配置:
Spring 配置的原理是将整个配置抽象为两部分:
通过 Spring 的 PropertyPlaceholderConfigurer 机制将其融合,生成一份 Instance 实例对象,每个Instance 对应的组件都是相互独立的,互不影响。
properties 配置文件分为两部分:
Canal 配置主要分为两部分定义:
1. instance 列表定义,(列出当前 Server 上有多少个 Instance,每个 Instance 的加载方式是Spring/Manager 等) 以下选一些重要的参数说明一下:
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.auto.scan | 开启instance自动扫描 如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发: a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动 b. instance目录删除:卸载对应instance配置,如已启动则进行关闭 c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作 | true |
canal.instance.global.spring.xml | 全局的spring配置方式的组件文件 | lasspath:spring/memory-instance.xml (spring目录相对于canal.conf.dir) |
2. common 参数定义,比如可以将 instance.properties 的公用参数,抽取放置到这里,这样每个Instance 启动的时候就可以共享。【instance.properties 配置定义优先级高于 canal.properties】以下选一些重要的参数说明一下:
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.register.ip | canal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip) | 无 |
canal.zookeeper.flush.period | canal持久化数据到zookeeper上的更新频率,单位毫秒 | 1000 |
canal.instance.memory.batch.mode | canal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小 | MEMSIZE |
canal.instance.memory.buffer.size | canal内存store中可缓存buffer记录数,需要为2的指数 | 16384 |
canal.instance.memory.buffer.memunit | 内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小 | 1024 |
canal.instance.filter.druid.ddl | 是否使用druid处理所有的ddl解析来获取库和表名 | true |
canal.instance.filter.query.dml | 是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档) | false |
canal.instance.parser.parallel | 是否开启binlog并行解析模式 (串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+) | true |
canal.admin.manager | canal链接canal-admin的地址 (v1.1.4新增) | 无 |
在 canal.properties 定义了 canal.destinations 后,需要在 canal.conf.dir 对应的目录下建立同名的文件。
如果 canal.properties 未定义 instance 列表,但开启了 canal.auto.scan 时:
instance.properties 参数列表(部分):
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.instance.mysql.slaveId | mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 (v1.1.x版本之后canal会自动生成,不需要手工指定) | 无 |
canal.instance.filter.regex | mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
1. 所有表:.* or .*\\..* 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔) | .*\\..* |
canal.instance.filter.black.regex | mysql 数据解析表的黑名单,表达式规则见白名单的规则 | 无 |
canal.instance.master.journal.name | mysql主库链接时起始的binlog文件 | 无 |
canal.instance.master.position | mysql主库链接时起始的binlog偏移量 | 无 |
canal.instance.master.timestamp | mysql主库链接时起始的binlog的时间戳 | 无 |
几点说明:
1. MySql 链接时的起始位置
2. MySql 解析关注表定义
3. MySql 链接的编码
目前默认支持的 instance.xml 有以下几种:
在介绍 instance 配置之前,先了解一下 Canal 如何维护一份增量订阅&消费的关系信息:
对应的两个位点组件,目前都有几种实现:
memory-instance.xml 介绍:
所有的组件(parser、sink、store)都选择了内存版模式,记录位点的都选择了 Memory 模式,重启后又会回到初始位点进行解析。
特点:速度最快,依赖最少(不需要 Zookeeper)。
场景:一般应用在 quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境。
default-instance.xml 介绍:
Store 选择了内存模式,其余的 parser/sink 依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入 Zookeeper,保证数据集群共享。
特点:支持 HA。
场景:生产环境,集群化部署。
group-instance.xml 介绍:
主要针对需要进行多库合并时,可以将多个物理 Instance 合并为一个逻辑 Instance,提供客户端访问。
场景:分库业务。比如产品数据拆分了4个库,每个库会有一个 Instance,如果不用 Group,业务上要消费数据时,需要启动4个客户端,分别链接4个 Instance 实例。使用 Group后,可以在 Canal Server 上合并为一个逻辑 Instance,只需要启动1个客户端,链接这个逻辑 Instance即可。
接下来我们来学习下 Canal 的使用,以 MySql 实时同步数据到 ElasticSearch为例。
首先我们需要下载 Canal 的各个组件 canal-server、canal-adapter、canal-admin。
下载地址:https://github.com/alibaba/canal/releases。
Canal 官方文档:https://github.com/alibaba/canal/wiki。
Canal 的各个组件的用途各不相同,下面分别介绍下:
由于不同版本的 MySql、ElasticSearch 和 Canal 会有兼容性问题,所以我们先对其使用版本做个约定:
应用 | 端口 | 版本 |
---|---|---|
MySql | 3306 | 5.7 |
ElasticSearch | 9200 | 7.6.2 |
Kibanba | 5601 | 7.6.2 |
canal-server | 11111 | 1.1.15 |
canal-adapter | 8081 | 1.1.15 |
canal-admin | 8089 | 1.1.15 |
由于 Canal 是通过订阅 MySql 的 binlog 来实现数据同步的,所以我们需要开启 MySql 的 binlog写入功能,并设置 binlog-format 为 ROW 模式,我的配置文件为 /mydata/mysql/conf/my.cnf,改为如下内容即可:
- [mysqld]
- ## 设置server_id,同一局域网中需要唯一
- server_id=101
- ## 指定不需要同步的数据库名称
- binlog-ignore-db=mysql
- ## 开启二进制日志功能
- log-bin=mall-mysql-bin
- ## 设置二进制日志使用内存大小(事务)
- binlog_cache_size=1M
- ## 设置使用的二进制日志格式(mixed,statement,row)
- binlog_format=row
- ## 二进制日志过期清理时间。默认值为0,表示不自动清理。
- expire_logs_days=7
- ## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
- ## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致
- slave_skip_errors=1062
配置完成后需要重新启动 MySql,重启成功后通过如下命令查看 binlog 是否启用:
- show variables like '%log_bin%'
- +---------------------------------+-------------------------------------+
- | Variable_name | Value |
- +---------------------------------+-------------------------------------+
- | log_bin | ON |
- | log_bin_basename | /var/lib/mysql/mall-mysql-bin |
- | log_bin_index | /var/lib/mysql/mall-mysql-bin.index |
- | log_bin_trust_function_creators | OFF |
- | log_bin_use_v1_row_events | OFF |
- | sql_log_bin | ON |
- +---------------------------------+-------------------------------------+
再查看下 MySql 的 binlog 模式:
- show variables like 'binlog_format%';
- +---------------+-------+
- | Variable_name | Value |
- +---------------+-------+
- | binlog_format | ROW |
- +---------------+-------+
接下来需要创建一个拥有从库权限的账号,用于订阅 binlog,这里创建的账号为 canal:canal:
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- FLUSH PRIVILEGES;
创建好测试用的数据库 canal-test,之后创建一张商品表 product,建表语句如下:
- CREATE TABLE `product` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT,
- `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
- `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
- `price` decimal(10, 2) NULL DEFAULT NULL,
- `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
将我们下载好的压缩包 canal.deployer-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,然后解压到指定目录 /mydata/canal-server,可使用如下命令解压:
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz
解压完成后目录结构如下:
- ├── bin
- │ ├── restart.sh
- │ ├── startup.bat
- │ ├── startup.sh
- │ └── stop.sh
- ├── conf
- │ ├── canal_local.properties
- │ ├── canal.properties
- │ └── example
- │ └── instance.properties
- ├── lib
- ├── logs
- │ ├── canal
- │ │ └── canal.log
- │ └── example
- │ ├── example.log
- │ └── example.log
- └── plugin
修改配置文件 conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置:
- # 需要同步数据的MySQL地址
- canal.instance.master.address=127.0.0.1:3306
- canal.instance.master.journal.name=
- canal.instance.master.position=
- canal.instance.master.timestamp=
- canal.instance.master.gtid=
- # 用于同步数据的数据库账号
- canal.instance.dbUsername=canal
- # 用于同步数据的数据库密码
- canal.instance.dbPassword=canal
- # 数据库连接编码
- canal.instance.connectionCharset = UTF-8
- # 需要订阅binlog的表过滤正则表达式
- canal.instance.filter.regex=.*\\..*
使用 startup.sh 脚本启动 canal-server 服务:
sh bin/startup.sh
启动成功后可使用如下命令查看服务日志信息:
- tail -f logs/canal/canal.log
- 2020-10-26 16:18:13.354 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
- 2020-10-26 16:18:19.978 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is r
启动成功后可使用如下命令查看 instance 日志信息:
- tail -f logs/example/example.log
- 2020-10-26 16:18:16.056 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
- 2020-10-26 16:18:16.061 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
- 2020-10-26 16:18:18.259 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
- 2020-10-26 16:18:18.282 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
- 2020-10-26 16:18:18.282 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
- 2020-10-26 16:18:19.543 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
- 2020-10-26 16:18:19.578 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
- 2020-10-26 16:18:19.912 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
- {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mall-mysql-bin.000006","position":2271,"serverId":101,"timestamp":1603682664000}}
- 2020-10-26 16:18:22.435 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbou
如果想要停止 canal-server 服务可以使用如下命令:
sh bin/stop.sh
将我们下载好的压缩包 canal.adapter-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,然后解压到指定目录 /mydata/canal-adpter,解压完成后目录结构如下:
- ├── bin
- │ ├── adapter.pid
- │ ├── restart.sh
- │ ├── startup.bat
- │ ├── startup.sh
- │ └── stop.sh
- ├── conf
- │ ├── application.yml
- │ ├── es6
- │ ├── es7
- │ │ ├── biz_order.yml
- │ │ ├── customer.yml
- │ │ └── product.yml
- │ ├── hbase
- │ ├── kudu
- │ ├── logback.xml
- │ ├── META-INF
- │ │ └── spring.factories
- │ └── rdb
- ├── lib
- ├── logs
- │ └── adapter
- │ └── adapter.log
- └── plugin
修改配置文件 conf/application.yml,按如下配置即可,主要是修改 canal-server 配置、数据源配置和客户端适配器配置:
- canal.conf:
- mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
- flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
- zookeeperHosts: # 对应集群模式下的zk地址
- syncBatchSize: 1000 # 每次同步的批数量
- retries: 0 # 重试次数, -1为无限重试
- timeout: # 同步超时时间, 单位毫秒
- accessKey:
- secretKey:
- consumerProperties:
- # canal tcp consumer
- canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
- canal.tcp.zookeeper.hosts:
- canal.tcp.batch.size: 500
- canal.tcp.username:
- canal.tcp.password:
-
- srcDataSources: # 源数据库配置
- defaultDS:
- url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true
- username: canal
- password: canal
- canalAdapters: # 适配器列表
- - instance: example # canal实例名或者MQ topic名
- groups: # 分组列表
- - groupId: g1 # 分组id, 如果是MQ模式将用到该值
- outerAdapters:
- - name: logger # 日志打印适配器
- - name: es7 # ES同步适配器
- hosts: 127.0.0.1:9200 # ES连接地址
- properties:
- mode: rest # 模式可选transport(9300) 或者 rest(9200)
- # security.auth: test:123456 # only used for rest mode
- cluster.name: elasticsearch # ES集群名称
添加配置文件 canal-adapter/conf/es7/product.yml,用于配置 MySql 中的表与 ElasticSearch 中索引的映射关系:
- dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
- destination: example # canal的instance或者MQ的topic
- groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
- esMapping:
- _index: canal_product # es 的索引名称
- _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
- sql: "SELECT
- p.id AS _id,
- p.title,
- p.sub_title,
- p.price,
- p.pic
- FROM
- product p" # sql映射
- etlCondition: "where a.c_time>={}" #etl的条件参数
- commitBatch: 3000 # 提交批大小
使用 startup.sh 脚本启动 canal-adapter 服务:
sh bin/startup.sh
启动成功后可使用如下命令查看服务日志信息:
- tail -f logs/adapter/adapter.log
- 20-10-26 16:52:55.148 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
- 2020-10-26 16:52:57.005 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
- 2020-10-26 16:52:57.376 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
- 2020-10-26 16:52:58.615 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 succeed
- 2020-10-26 16:52:58.651 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /mydata/canal-adapter/plugin
- 2020-10-26 16:52:59.043 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
- 2020-10-26 16:52:59.044 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
- 2020-10-26 16:52:59.057 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
- 2020-10-26 16:52:59.100 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
- 2020-10-26 16:52:59.153 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
- 2020-10-26 16:52:59.590 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
- 2020-10-26 16:52:59.626 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 31.278 seconds (JVM running for 33.99)
- 2020-10-26 16:52:59.930 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
如果需要停止 canal-adapter 服务可以使用如下命令:
sh bin/stop.sh
将我们下载好的压缩包 canal.admin-1.1.5-SNAPSHOT.tar.gz 上传到 Linux 服务器,然后解压到指定目录 /mydata/canal-admin,解压完成后目录结构如下:
- ├── bin
- │ ├── restart.sh
- │ ├── startup.bat
- │ ├── startup.sh
- │ └── stop.sh
- ├── conf
- │ ├── application.yml
- │ ├── canal_manager.sql
- │ ├── canal-template.properties
- │ ├── instance-template.properties
- │ ├── logback.xml
- │ └── public
- │ ├── avatar.gif
- │ ├── index.html
- │ ├── logo.png
- │ └── static
- ├── lib
- └── logs
创建 canal-admin 需要使用的数据库 canal_manager,创建 SQL 脚本为 /mydata/canal-admin/conf/canal_manager.sql,会创建如下表:
修改配置文件 conf/application.yml,按如下配置即可,主要是修改数据源配置和 canal-admin 的管理账号配置,注意需要用一个有读写权限的数据库账号,比如管理账号 root:root:
- server:
- port: 8089
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
-
- spring.datasource:
- address: 127.0.0.1:3306
- database: canal_manager
- username: root
- password: root
- driver-class-name: com.mysql.jdbc.Driver
- url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
- hikari:
- maximum-pool-size: 30
- minimum-idle: 1
-
- canal:
- adminUser: admin
- adminPasswd: admin
接下来对之前搭建的 canal-server 的 conf/canal_local.properties 文件进行配置,主要是修改canal-admin 的配置,修改完成后使用 sh bin/startup.sh local 重启 canal-server:
- # register ip
- canal.register.ip =
-
- # canal admin config
- canal.admin.manager = 127.0.0.1:8089
- canal.admin.port = 11110
- canal.admin.user = admin
- canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
- # admin auto register
- canal.admin.register.auto = true
- canal.admin.register.cluster =
使用 startup.sh 脚本启动 canal-admin 服务:
sh bin/startup.sh
启动成功后可使用如下命令查看服务日志信息:
- tail -f logs/admin.log
- 020-10-27 10:15:04.210 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
- 2020-10-27 10:15:04.308 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
- 2020-10-27 10:15:04.534 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
- 2020-10-27 10:15:04.573 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 31.203 seconds (JVM running for 34.865)
访问 canal-admin 的 Web 界面,输入账号密码 admin:123456 即可登录,访问地址:http://192.168.3.101:8089
登录成功后即可使用 Web 界面操作 canal-server:
经过上面的一系列步骤,Canal 的数据同步功能已经基本可以使用了,下面我们来演示下数据同步功能。
首先我们需要在 ElasticSearch 中创建索引,和 MySql 中的 product 表相对应,直接在 Kibana 的Dev Tools 中使用如下命令创建即可:
- PUT canal_product
- {
- "mappings": {
- "properties": {
- "title": {
- "type": "text"
- },
- "sub_title": {
- "type": "text"
- },
- "pic": {
- "type": "text"
- },
- "price": {
- "type": "double"
- }
- }
- }
- }
创建完成后可以查看下索引的结构:
之后使用如下 SQL 语句在数据库中创建一条记录:
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 5, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL
创建成功后,在 ElasticSearch 中搜索下,发现数据已经同步了:
再使用如下 SQL 对数据进行修改:
UPDATE product SET title='小米10' WHERE id=5
修改成功后,在 ElasticSearch 中搜索下,发现数据已经修改了:
再使用如下 SQL 对数据进行删除操作:
DELETE FROM product WHERE id=5
删除成功后,在 ElasticSearch 中搜索下,发现数据已经删除了,至此 MySql 同步到 ElasticSearch的功能完成了!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。