当前位置:   article > 正文

canal将数据同步到es、mq等实践_canal 同步es

canal 同步es

简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

687474703a2f2f646c2e69746579652e636f6d2f75706c6f61642f6174746163686d656e742f303038302f333038362f34363863316131342d653761642d333239302d396433642d3434616335303161373232372e6a7067.jpg

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

应用场景

1、同步缓存redis/全文搜索ES

canal一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法,如下图所示:

13587608-871628bb7897f40b.png

2、下发任务

另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/kafka进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过MQ发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。

13587608-f102d71aacf0d302.png

3、数据异构

在大型网站架构中,DB都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。

所谓的数据异构,那就是将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。canal就是实现数据异构的手段之一。

13587608-871628bb7897f40b.png

下载地址:

Releases · alibaba/canal · GitHub

配置步骤:

  • mysql主同步开启

## 设置server_id,同一局域网中需要唯一
server-id=101
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=canal
## 开启二进制日志功能
log-bin=mall-mysql-bin
## 设置二进制日志使用内存大小(事务)
binlog_cache_size=1M
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row
## 二进制日志过期清理时间。默认值为0,表示不自动清理。
expire_logs_days=7
## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致,1046表结构不存在
slave_skip_errors=1062
##同步的库名(多个库可以写多行,不指定就同步所有)
binlog_do_db = custvideo
binlog_do_db = test_canal
##忽略的库
binlog_ignore_db = mysql
##同步的库的表名
replicate-wild-do-table = custvideo.mac_cust_video
  • 重启mysql服务

图片.png

图片.png

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
  •     canal-deployer配置

最新下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

mkdir canal-deployer && tar -xvzf canal.deployer-1.1.6.tar.gz -C  canal-deployer

图片.png

图片.png

conf/example/instance.properties 配置(instance级别的配置文件,每个instance一份)

################################################# 
## mysql serverId , v1.0.26+ will autoGen 
## 开启从服务器ID
canal.instance.mysql.slaveId=1111 
 
# enable gtid use true/false 
canal.instance.gtidon=false 
 
# position info
#需要连接的数据库地址及端口
canal.instance.master.address=127.0.0.1:3306
#需要读取的起始的binlog文件
canal.instance.master.journal.name=
#需要读取的起始的binlog文件的偏移量
canal.instance.master.position=
#需要读取的起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#mysql 数据解析关注的表,Perl正则表达式.
#多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
#常见例子:
#1.  所有表:.*   or  .*\\..*
#2.  canal schema下所有表: canal\\..*
#3.  canal下的以canal打头的表:canal\\.canal.*
#4.  canal schema下的一张表:canal.test1
#5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)#
#注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
canal.instance.filter.regex=test_canal
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

常用配置正则表达式:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

主要修改位置

# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*

conf/canal.properties 配置(系统根配置文件)

#################################################
#########               common argument         #############
#################################################
# tcp bind ip
#canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行
canal.ip =
# register ip to zookeeper
canal.register.ip =
#canal server提供socket tcp服务的端口
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
#canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

启动:

sh bin/startup.sh

图片.png

canal-admin与canal-server的整合

安装步骤比较简单,下载->解压->配置->导数据库->启动(sh bin/startup.sh)

说了整合,那就是canal-admin和canal-server一起,所以说,canal-server的配置也要改的喔

我们可以选择更改canal-server的canal_local.properties的配置覆盖canal.properties

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

图片.png

canal_local.properties配置文件

# register ip  注册ip
canal.register.ip =
 
# canal admin config  连接的canal-admin的信息
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

我们来说明一下,这个user和passwd,其实就是我们在上面canal-admin配置文件看到的用户名和密码,只不过这个passwd,是加密过的

canal.admin.register.auto:自动注册的意思,如果没有配置,canal-server 启动后需要自行在 canal-server 上面添加

canal.admin.register.cluster:这个配置如果不写代表当前的 canal-server 是一个单机节点,如果添加的名字在 canal-admin 上面没有提前注册,canal-server 启动时会报错

注:关于canal.admin.passwd密码配置的说明,针对canal.admin.passwd,默认做了密码加密处理,这里的passwd是一个密文和canal-admin里application.yml里的密码原文做对应。
密文的生成方式,请登录mysql,执行如下密文生成sql即可(记得去掉第一个首字母的星号)

图片.png

改完之后,启动canal-server,启动的时候,记得带参数,指定local配置

命令:sh bin/startup.sh local

canal-server与client模式:

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑,欢迎大家提交 pull request

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力

namespace xingwenge\canal_php\sample;

use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;

require_once './vendor/autoload.php';

ini_set('display_errors', 'On');
error_reporting(E_ALL);

try {
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

    $client->connect("127.0.0.1", 11111);
    $client->checkValid();
    $client->subscribe("1002", "test", ".*\\..*");
    # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤

    while (true) {
        $message = $client->get(1);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }else{
         echo '没取到'.PHP_EOL;
        }
        sleep(1);
    }

    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

图片.png

了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal.

大致分为几部分:

  • ClientIdentity
    canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)

  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector :   两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制

  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点.   保证整个系统的高可用性.

数据结构说明:ClientAPI · alibaba/canal Wiki · GitHub

Entry  
    Header  
        logfileName [binlog文件名]  
        logfileOffset [binlog position]  
        executeTime [binlog里记录变更发生的时间戳,精确到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete类型]  
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
    storeValue  [byte数据,可展开,对应的类型为RowChange]RowChange
isDdl       [是否是ddl变更操作,比如create table/drop table]
sql         [具体的ddl sql]
rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType     [jdbc type]
name        [column name]
isKey       [是否为主键]
updated     [是否发生过变更]
isNull      [值是否为null]
value       [具体的内容,注意为string文本]

其他链接:

相关配置明细说明: https://chengyanan.blog.csdn.net/article/details/90445323?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-90445323-blog-124649679.pc_relevant_default&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-90445323-blog-124649679.pc_relevant_default&utm_relevant_index=1

MYSQL8.0mysql-connector-java的jar包下载地址:

    不同版本mysql-connector-java的jar包下载地址-CSDN博客

    MySQL :: Download MySQL Connector/J (Archived Versions)

Canal-Admin-QuickStart
Canal Admin Guide
Canal Admin ServerGuide

Canal properties配置文件参数含义(1) - Pang个人博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/497249
推荐阅读
相关标签
  

闽ICP备14008679号