赞
踩
canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)。使用场景包括:
1.缓存更新
2.异步数据库或者同步到关系型数据库的中间媒介
基于日志增量订阅&消费支持的业务:
这里也介绍了业务cache刷新和价格变化等重要数据变更消息的监听。
Canal原理相对比较简单:
Canal架构及工作原理
先在Docker Hub中下载canal-server镜像
- docker pull canal/canal-server:latest
- 复制代码
先启动Canal,用于复制properties配置文件
- docker run -p 11111:11111 --name canal -d canal/canal-server:latest
- 复制代码
初次启动Canal镜像后,将instance.properties文件复制到宿主机,用于后续挂载使用
- docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/
- 复制代码
修改instance.properties,该文件主要配置监听的mysql实例
- #################################################
- ## mysql serverId , v1.0.26+ will autoGen
- # canal.instance.mysql.slaveId=0
-
- # enable gtid use true/false 未开启gtid主从同步
- canal.instance.gtidon=false
-
- # position info 在同一宿主机内 若有主从数据库,填写主数据库地址
- canal.instance.master.address=172.17.0.1:3306
- #需要读取的起始的binlog文件 不填写的话默认应该是从最新的Binlog开始监听
- canal.instance.master.journal.name=
- #需要读取的起始的binlog文件的偏移量
- canal.instance.master.position=
- #需要读取的起始的binlog的时间戳
- canal.instance.master.timestamp=
- canal.instance.master.gtid=
-
- # rds oss binlog
- canal.instance.rds.accesskey=
- canal.instance.rds.secretkey=
- canal.instance.rds.instanceId=
-
- # table meta tsdb info
- canal.instance.tsdb.enable=true
- #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
- #canal.instance.tsdb.dbUsername=canal
- #canal.instance.tsdb.dbPassword=canal
-
- # 从数据库地址 主备切换时使用
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- #canal.instance.standby.gtid=
-
- # username/password
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset = UTF-8
- # enable druid Decrypt database password
- canal.instance.enableDruid=false
- #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
-
- # table regex
- canal.instance.filter.regex=.*\..*
- # table black regex 不需要监听的名单
- canal.instance.filter.black.regex=mysql\..*,sys\..*,performance_schema\..*,information_schema\..*
- # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
- #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
- # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
- #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
-
- # mq config 默认的sql存储队列
- canal.mq.topic=example
- # dynamic topic route by schema or table regex
- #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*
- canal.mq.partition=0
- # hash partition config
- #canal.mq.enableDynamicQueuePartition=false
- #canal.mq.partitionsNum=3
- #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
- #canal.mq.partitionHash=test.table:id^name,.*\..*
- #################################################
-
- 复制代码
Canal为我们提供了canal.instance.filter.regex与canal.instance.filter.black.regex选项参数来过滤数据库表数据解析,类似黑白名单。常见例子有: ●所有表:.* or ... ●canal schema下所有表:canal..* ●canal下的以canal打头的表:canal.canal.* ●canal schema下的一张表:canal.test1 ●多个规则组合使用:canal..*,mysql.test1,mysql.test2 (逗号分隔)
修改canal.properties,该文件主要时配置canal server
- #################################################
- ######### destinations #############
- #################################################
- ##配置监听多数据实例的地方 单数据库监听的话这里配置example就可以
- canal.destinations = example
- # conf root dir
- canal.conf.dir = ../conf
- # auto scan instance dir add/remove and start/stop instance
- canal.auto.scan = true
- canal.auto.scan.interval = 5
- # set this value to 'true' means that when binlog pos not found, skip to latest.
- # WARN: pls keep 'false' in production env, or if you know what you want.
- canal.auto.reset.latest.pos.mode = false
-
- canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
- #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
-
- canal.instance.global.mode = spring
- canal.instance.global.lazy = false
- canal.instance.global.manager.address = ${canal.admin.manager}
- #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
- canal.instance.global.spring.xml = classpath:spring/file-instance.xml
- #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
-
- # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 选择的消费队列
- canal.serverMode = tcp
- 复制代码
消费队列模式与Server-client模式一致,主要区别如下:
这种模式相比于Server-client模式
下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等
会增加系统的复杂度,增加一些延迟
- #本地的instance.properties:容器的instance.properties 将容器的instance.properties配置文件挂载到宿主机,方便后续变更
- docker stop canal;docker rm canal; 重新生成容器
- docker run -p 11111:11111 --name canal -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server:latest
- 复制代码
查看消费实例example的日志可以看出canal监听的binlog位置正好是连接时的binlog位置,前提是未指定了Binlog的位置。客户端开始连接后便可以从指定位置开始消费增量的binlog。binlog-format=ROW # 选择 ROW 模式
1.引入pom文件
- <!--canal-->
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.5</version>
- </dependency>
-
- <!-- Message、CanalEntry.Entry等来自此安装包 -->
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.protocol</artifactId>
- <version>1.1.5</version>
- </dependency>
-
- 复制代码
2.application.yml配置文件canal
- canal:
- serverAddress: 42.192.183.193
- serverPort: 11111
- instance: #多个instance
- - example
- 复制代码
对应的properties文件
- @Component
- @ConfigurationProperties(prefix = "canal")
- @Data
- public class CanalInstanceProperties {
-
- /**
- * canal server地址
- */
- private String serverAddress;
-
- /**
- * canal server端口
- */
- private Integer serverPort;
-
- /**
- * canal 监听实例
- */
- private Set<String> instance;
-
- }
- 复制代码
3.监听数据库变动代码
- @Component
- @Slf4j
- public class MysqlDataListening {
-
- private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
-
- private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
-
- @Autowired
- private CanalInstanceProperties canalInstanceProperties;
-
-
- @PostConstruct
- private void startListening() {
- canalInstanceProperties.getInstance().forEach(
- instanceName -> {
- executors.submit(() -> {
- connector(instanceName);
- });
- }
- );
- }
-
- /**
- * 消费canal的线程池
- */
- public void connector(String instance){
- CanalConnector canalConnector = CanalConnectors.newSingleConnector(
- new InetSocketAddress(canalInstanceProperties.getServerAddress(), canalInstanceProperties.getServerPort()),
- instance, "", "");
- canalConnector.connect();
- //订阅所有消息
- canalConnector.subscribe(".*\..*");
- // canalConnector.subscribe("test1.*"); 只订阅test1数据库下的所有表
- //恢复到之前同步的那个位置
- canalConnector.rollback();
-
- for(;;){
- //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
- Message message = canalConnector.getWithoutAck(100);
- //获取消息id
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (size == 0 || batchId == -1) {
- try{
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- }
- if(batchId != -1){
- log.info("instance -> {}, msgId -> {}", instance, batchId);
- printEnity(message.getEntries());
- //提交确认
- canalConnector.ack(batchId);
- //处理失败,回滚数据
- //canalConnector.rollback(batchId);
- }
- }
- }
-
- private void printEnity(List<CanalEntry.Entry> entries) {
- for (CanalEntry.Entry entry : entries) {
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
- || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
- continue;
- }
-
- CanalEntry.RowChange rowChange = null;
- try{
- // 序列化数据
- rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- assert rowChange != null;
- CanalEntry.EventType eventType = rowChange.getEventType();
- log.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
-
- if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
- log.info("sql ------------>{}" ,rowChange.getSql());
- }
-
- for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
- switch (rowChange.getEventType()){
- //如果希望监听多种事件,可以手动增加case
- case UPDATE:
- printColumn(rowData.getAfterColumnsList());
- printColumn(rowData.getBeforeColumnsList());
- break;
- case INSERT:
- printColumn(rowData.getAfterColumnsList());
- break;
- case DELETE:
- printColumn(rowData.getBeforeColumnsList());
- break;
- default:
- }
- }
-
- }
- }
-
- private void printColumn(List<CanalEntry.Column> columns) {
- StringBuilder sb = new StringBuilder();
- for (CanalEntry.Column column : columns) {
- sb.append("[");
- sb.append(column.getName()).append(" : ").append(column.getValue()).append(" update=").append(column.getUpdated());
- sb.append("]");
- sb.append(" ");
- }
- log.info(sb.toString());
- }
- 复制代码
注意的问题canal client: 为了保证有序性,一份实例(instance)同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。canal server 上的一个 instance 只能有一个 client 消费。clientId是固定的,Binlog文件落入文件保存。
由于保证了有序性,生产过快而消费慢的问题,如何解决消费堆积问题
其次在使用Canal自带客户端进行同步时需要自己手动调用get()或者getWithoutAck()进行拉取 拉取日志后进行同步只能一条一条处理,效率比较低 为了解决上面的问题打算在日志同步过程中引入MQ来作为中间同步,Canal支持RocketMQ和Kafka两种,最终选用Kafka来进行
canal的原理是借助mysql主从复制的协议,模拟从数据库拉取增量Binlog日。canal通过Instance作为一个从数据库实例,客户端连接实例后有序消费增量的Binlog日志。有几点特别注意的是,一是canal的生产消费模型是一个带指针的数组,分别指向生产位置、消费位置和ack位置,来控制消费和生产的队列。二是Binlog的配置需要时row格式,canal的解析针对row格式做了适配。三是canal通过client竞争的方式保证消费时只有一个client消费,保证binlog的有序性。四是,生产端数据量大的时候canal会存在消费不及时的问题,存在一定延时性。性能分析时业务binlog入库到canal client拿到数据,基本可以达到10~20w的TPS。具体业务解析时肯定要低于这个,不过对于一般业务来说,已足够用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。