赞
踩
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
canal一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法,如下图所示:
另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/kafka进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过MQ发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。
在大型网站架构中,DB都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。
所谓的数据异构,那就是将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。canal就是实现数据异构的手段之一。
Releases · alibaba/canal · GitHub
mysql主同步开启
## 设置server_id,同一局域网中需要唯一 server-id=101 log-bin=/var/lib/mysql/mysql-bin binlog-do-db=canal ## 开启二进制日志功能 log-bin=mall-mysql-bin ## 设置二进制日志使用内存大小(事务) binlog_cache_size=1M ## 设置使用的二进制日志格式(mixed,statement,row) binlog_format=row ## 二进制日志过期清理时间。默认值为0,表示不自动清理。 expire_logs_days=7 ## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。 ## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致,1046表结构不存在 slave_skip_errors=1062 ##同步的库名(多个库可以写多行,不指定就同步所有) binlog_do_db = custvideo binlog_do_db = test_canal ##忽略的库 binlog_ignore_db = mysql ##同步的库的表名 replicate-wild-do-table = custvideo.mac_cust_video
重启mysql服务
创建数据库用户及权限
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
canal-deployer配置
最新下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
mkdir canal-deployer && tar -xvzf canal.deployer-1.1.6.tar.gz -C canal-deployer
################################################# ## mysql serverId , v1.0.26+ will autoGen ## 开启从服务器ID canal.instance.mysql.slaveId=1111 # enable gtid use true/false canal.instance.gtidon=false # position info #需要连接的数据库地址及端口 canal.instance.master.address=127.0.0.1:3306 #需要读取的起始的binlog文件 canal.instance.master.journal.name= #需要读取的起始的binlog文件的偏移量 canal.instance.master.position= #需要读取的起始的binlog的时间戳 canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex #mysql 数据解析关注的表,Perl正则表达式. #多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) #常见例子: #1. 所有表:.* or .*\\..* #2. canal schema下所有表: canal\\..* #3. canal下的以canal打头的表:canal\\.canal.* #4. canal schema下的一张表:canal.test1 #5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)# #注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤) canal.instance.filter.regex=test_canal # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false #canal.mq.partitionsNum=3 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
常用配置正则表达式:
mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 常见例子: 1. 所有表:.* or .*\\..* 2. canal schema下所有表: canal\\..* 3. canal下的以canal打头的表:canal\\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
主要修改位置
# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
################################################# ######### common argument ############# ################################################# # tcp bind ip #canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行 canal.ip = # register ip to zookeeper canal.register.ip = #canal server提供socket tcp服务的端口 canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = # flush data to zk #canal持久化数据到zookeeper上的更新频率,单位毫秒 canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = tcp # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60
启动:
sh bin/startup.sh
安装步骤比较简单,下载->解压->配置->导数据库->启动(sh bin/startup.sh)
说了整合,那就是canal-admin和canal-server一起,所以说,canal-server的配置也要改的喔
我们可以选择更改canal-server的canal_local.properties的配置覆盖canal.properties
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: root password: root driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
canal_local.properties配置文件
# register ip 注册ip canal.register.ip = # canal admin config 连接的canal-admin的信息 canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =
我们来说明一下,这个user和passwd,其实就是我们在上面canal-admin配置文件看到的用户名和密码,只不过这个passwd,是加密过的
canal.admin.register.auto:自动注册的意思,如果没有配置,canal-server 启动后需要自行在 canal-server 上面添加
canal.admin.register.cluster:这个配置如果不写代表当前的 canal-server 是一个单机节点,如果添加的名字在 canal-admin 上面没有提前注册,canal-server 启动时会报错
注:关于canal.admin.passwd密码配置的说明,针对canal.admin.passwd,默认做了密码加密处理,这里的passwd是一个密文和canal-admin里application.yml里的密码原文做对应。
密文的生成方式,请登录mysql,执行如下密文生成sql即可(记得去掉第一个首字母的星号)
改完之后,启动canal-server,启动的时候,记得带参数,指定local配置
命令:sh bin/startup.sh local
canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request
canal java 客户端: ClientExample · alibaba/canal Wiki · GitHub
canal c# 客户端: GitHub - dotnetcore/CanalSharp: Alibaba mysql database binlog subscription & consumer components Canal's .NET client.
canal Python客户端:GitHub - bithaolee/canal-python: alibaba canal 客户端(Python3 版本)
canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力
namespace xingwenge\canal_php\sample; use xingwenge\canal_php\CanalClient; use xingwenge\canal_php\CanalConnectorFactory; use xingwenge\canal_php\Fmt; require_once './vendor/autoload.php'; ini_set('display_errors', 'On'); error_reporting(E_ALL); try { $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); $client->connect("127.0.0.1", 11111); $client->checkValid(); $client->subscribe("1002", "test", ".*\\..*"); # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤 while (true) { $message = $client->get(1); if ($entries = $message->getEntries()) { foreach ($entries as $entry) { Fmt::println($entry); } }else{ echo '没取到'.PHP_EOL; } sleep(1); } $client->disConnect(); } catch (\Exception $e) { echo $e->getMessage(), PHP_EOL; }
了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal.
大致分为几部分:
ClientIdentity
canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)
CanalConnector
SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制
CanalNodeAccessStrategy
SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.
ClientRunningMonitor/ClientRunningListener/ClientRunningData
client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.
数据结构说明:ClientAPI · alibaba/canal Wiki · GitHub
Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [binlog里记录变更发生的时间戳,精确到秒] schemaName tableName eventType [insert/update/delete类型] entryType [事务头BEGIN/事务尾END/数据ROWDATA] storeValue [byte数据,可展开,对应的类型为RowChange]RowChange isDdl [是否是ddl变更操作,比如create table/drop table] sql [具体的ddl sql] rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [Column类型的数组,变更前的数据字段] afterColumns [Column类型的数组,变更后的数据字段] Column index sqlType [jdbc type] name [column name] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [具体的内容,注意为string文本]
其他链接:
MYSQL8.0mysql-connector-java的jar包下载地址:
不同版本mysql-connector-java的jar包下载地址-CSDN博客
MySQL :: Download MySQL Connector/J (Archived Versions)
Canal-Admin-QuickStart
Canal Admin Guide
Canal Admin ServerGuide
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。