当前位置:   article > 正文

canal监听mysql实践_cana的sql监听

cana的sql监听

canal监听mysql实践

 canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)。使用场景包括:

1.缓存更新

2.异步数据库或者同步到关系型数据库的中间媒介

canal介绍及工作原理

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

这里也介绍了业务cache刷新和价格变化等重要数据变更消息的监听。

Canal原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

Canal架构及工作原理

  1. server 代表一个 canal 运行实例,对应于一个 jvm
  2. instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )
  3. instance 下的子模块
  4. eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
  5. eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
  6. eventStore: 数据存储
  7. metaManager: 增量订阅 & 消费信息管理器 

  • EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
  • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
  • void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

docker canal搭建

先在Docker Hub中下载canal-server镜像

  1. docker pull canal/canal-server:latest
  2. 复制代码

先启动Canal,用于复制properties配置文件

  1. docker run -p 11111:11111 --name canal -d canal/canal-server:latest
  2. 复制代码

初次启动Canal镜像后,将instance.properties文件复制到宿主机,用于后续挂载使用

  1. docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/
  2. 复制代码

修改instance.properties,该文件主要配置监听的mysql实例

  1. #################################################
  2. ## mysql serverId , v1.0.26+ will autoGen
  3. # canal.instance.mysql.slaveId=0
  4. # enable gtid use true/false 未开启gtid主从同步
  5. canal.instance.gtidon=false
  6. # position info 在同一宿主机内 若有主从数据库,填写主数据库地址
  7. canal.instance.master.address=172.17.0.1:3306
  8. #需要读取的起始的binlog文件 不填写的话默认应该是从最新的Binlog开始监听
  9. canal.instance.master.journal.name=
  10. #需要读取的起始的binlog文件的偏移量
  11. canal.instance.master.position=
  12. #需要读取的起始的binlog的时间戳
  13. canal.instance.master.timestamp=
  14. canal.instance.master.gtid=
  15. # rds oss binlog
  16. canal.instance.rds.accesskey=
  17. canal.instance.rds.secretkey=
  18. canal.instance.rds.instanceId=
  19. # table meta tsdb info
  20. canal.instance.tsdb.enable=true
  21. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  22. #canal.instance.tsdb.dbUsername=canal
  23. #canal.instance.tsdb.dbPassword=canal
  24. # 从数据库地址 主备切换时使用
  25. #canal.instance.standby.address =
  26. #canal.instance.standby.journal.name =
  27. #canal.instance.standby.position =
  28. #canal.instance.standby.timestamp =
  29. #canal.instance.standby.gtid=
  30. # username/password
  31. canal.instance.dbUsername=canal
  32. canal.instance.dbPassword=canal
  33. canal.instance.connectionCharset = UTF-8
  34. # enable druid Decrypt database password
  35. canal.instance.enableDruid=false
  36. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  37. # table regex
  38. canal.instance.filter.regex=.*\..*
  39. # table black regex 不需要监听的名单
  40. canal.instance.filter.black.regex=mysql\..*,sys\..*,performance_schema\..*,information_schema\..*
  41. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  42. #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  43. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  44. #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  45. # mq config 默认的sql存储队列
  46. canal.mq.topic=example
  47. # dynamic topic route by schema or table regex
  48. #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*
  49. canal.mq.partition=0
  50. # hash partition config
  51. #canal.mq.enableDynamicQueuePartition=false
  52. #canal.mq.partitionsNum=3
  53. #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
  54. #canal.mq.partitionHash=test.table:id^name,.*\..*
  55. #################################################
  56. 复制代码

Canal为我们提供了canal.instance.filter.regex与canal.instance.filter.black.regex选项参数来过滤数据库表数据解析,类似黑白名单。常见例子有: ●所有表:.* or ... ●canal schema下所有表:canal..* ●canal下的以canal打头的表:canal.canal.* ●canal schema下的一张表:canal.test1 ●多个规则组合使用:canal..*,mysql.test1,mysql.test2 (逗号分隔)

修改canal.properties,该文件主要时配置canal server

  1. #################################################
  2. #########     destinations     #############
  3. #################################################
  4. ##配置监听多数据实例的地方 单数据库监听的话这里配置example就可以
  5. canal.destinations = example
  6. # conf root dir
  7. canal.conf.dir = ../conf
  8. # auto scan instance dir add/remove and start/stop instance
  9. canal.auto.scan = true
  10. canal.auto.scan.interval = 5
  11. # set this value to 'true' means that when binlog pos not found, skip to latest.
  12. # WARN: pls keep 'false' in production env, or if you know what you want.
  13. canal.auto.reset.latest.pos.mode = false
  14. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
  15. #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  16. canal.instance.global.mode = spring
  17. canal.instance.global.lazy = false
  18. canal.instance.global.manager.address = ${canal.admin.manager}
  19. #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
  20. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  21. #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  22. # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 选择的消费队列
  23. canal.serverMode = tcp
  24. 复制代码

消费队列模式与Server-client模式一致,主要区别如下:

  • 不需要CanalServerWithNetty,改为CanalMQProducer投递消息给消息队列
  • 不使用CanalClient,改为MqClient获取消息队列的消息进行消费

这种模式相比于Server-client模式

  • 下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等

  • 会增加系统的复杂度,增加一些延迟

  1. #本地的instance.properties:容器的instance.properties 将容器的instance.properties配置文件挂载到宿主机,方便后续变更
  2. docker stop canal;docker rm canal; 重新生成容器
  3. docker run -p 11111:11111 --name canal -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server:latest
  4. 复制代码

查看消费实例example的日志可以看出canal监听的binlog位置正好是连接时的binlog位置,前提是未指定了Binlog的位置。客户端开始连接后便可以从指定位置开始消费增量的binlog。binlog-format=ROW # 选择 ROW 模式

java客户端实例消费

1.引入pom文件

  1.            <!--canal-->
  2.            <dependency>
  3.                <groupId>com.alibaba.otter</groupId>
  4.                <artifactId>canal.client</artifactId>
  5.                <version>1.1.5</version>
  6.            </dependency>
  7.            <!-- Message、CanalEntry.Entry等来自此安装包 -->
  8.            <dependency>
  9.                <groupId>com.alibaba.otter</groupId>
  10.                <artifactId>canal.protocol</artifactId>
  11.                <version>1.1.5</version>
  12.            </dependency>
  13. 复制代码

2.application.yml配置文件canal

  1. canal:
  2. serverAddress: 42.192.183.193
  3. serverPort: 11111
  4. instance: #多个instance
  5.   - example
  6. 复制代码

对应的properties文件

  1. @Component
  2. @ConfigurationProperties(prefix = "canal")
  3. @Data
  4. public class CanalInstanceProperties {
  5.    /**
  6.     * canal server地址
  7.     */
  8.    private String serverAddress;
  9.    /**
  10.     * canal server端口
  11.     */
  12.    private Integer serverPort;
  13.    /**
  14.     * canal 监听实例
  15.     */
  16.    private Set<String> instance;
  17. }
  18. 复制代码

3.监听数据库变动代码

  1. @Component
  2. @Slf4j
  3. public class MysqlDataListening {
  4.    private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
  5.    private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
  6.    @Autowired
  7.    private CanalInstanceProperties canalInstanceProperties;
  8.    @PostConstruct
  9.    private void startListening() {
  10.        canalInstanceProperties.getInstance().forEach(
  11.            instanceName -> {
  12.                executors.submit(() -> {
  13.                    connector(instanceName);
  14.               });
  15.           }
  16.       );
  17.   }
  18.    /**
  19.     * 消费canal的线程池
  20.     */
  21.    public void connector(String instance){
  22.        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
  23.                new InetSocketAddress(canalInstanceProperties.getServerAddress(), canalInstanceProperties.getServerPort()),
  24.                instance, "", "");
  25.        canalConnector.connect();
  26.        //订阅所有消息
  27.        canalConnector.subscribe(".*\..*");
  28.        // canalConnector.subscribe("test1.*"); 只订阅test1数据库下的所有表
  29.        //恢复到之前同步的那个位置
  30.        canalConnector.rollback();
  31.        for(;;){
  32.            //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
  33.            Message message = canalConnector.getWithoutAck(100);
  34.            //获取消息id
  35.            long batchId = message.getId();
  36.            int size = message.getEntries().size();
  37.            if (size == 0 || batchId == -1) {
  38.                try{
  39.                    Thread.sleep(1000);
  40.               } catch (InterruptedException ignored) {
  41.               }
  42.           }
  43.            if(batchId != -1){
  44.                log.info("instance -> {}, msgId -> {}", instance, batchId);
  45.                printEnity(message.getEntries());
  46.                //提交确认
  47.                canalConnector.ack(batchId);
  48.                //处理失败,回滚数据
  49.                //canalConnector.rollback(batchId);
  50.           }
  51.       }
  52.   }
  53.    private  void printEnity(List<CanalEntry.Entry> entries) {
  54.        for (CanalEntry.Entry entry : entries) {
  55.            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
  56.                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  57.                continue;
  58.           }
  59.            CanalEntry.RowChange rowChange = null;
  60.            try{
  61.                // 序列化数据
  62.                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  63.           } catch (InvalidProtocolBufferException e) {
  64.                e.printStackTrace();
  65.           }
  66.            assert rowChange != null;
  67.            CanalEntry.EventType eventType = rowChange.getEventType();
  68.            log.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  69.                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  70.                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  71.                     eventType));
  72.            if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
  73.                log.info("sql ------------>{}" ,rowChange.getSql());
  74.           }
  75.            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
  76.                    switch (rowChange.getEventType()){
  77.                        //如果希望监听多种事件,可以手动增加case
  78.                        case UPDATE:
  79.                            printColumn(rowData.getAfterColumnsList());
  80.                            printColumn(rowData.getBeforeColumnsList());
  81.                            break;
  82.                        case INSERT:
  83.                            printColumn(rowData.getAfterColumnsList());
  84.                            break;
  85.                        case DELETE:
  86.                            printColumn(rowData.getBeforeColumnsList());
  87.                            break;
  88.                        default:
  89.                   }
  90.               }
  91.       }
  92.   }
  93.    private void printColumn(List<CanalEntry.Column> columns) {
  94.        StringBuilder sb = new StringBuilder();
  95.        for (CanalEntry.Column column : columns) {
  96.            sb.append("[");
  97.            sb.append(column.getName()).append(" : ").append(column.getValue()).append("   update=").append(column.getUpdated());
  98.            sb.append("]");
  99.            sb.append("   ");
  100.       }
  101.        log.info(sb.toString());
  102.   }
  103. 复制代码

注意的问题canal client: 为了保证有序性,一份实例(instance)同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。canal server 上的一个 instance 只能有一个 client 消费。clientId是固定的,Binlog文件落入文件保存。

 由于保证了有序性,生产过快而消费慢的问题,如何解决消费堆积问题

其次在使用Canal自带客户端进行同步时需要自己手动调用get()或者getWithoutAck()进行拉取 拉取日志后进行同步只能一条一条处理,效率比较低 为了解决上面的问题打算在日志同步过程中引入MQ来作为中间同步,Canal支持RocketMQ和Kafka两种,最终选用Kafka来进行

总结

canal的原理是借助mysql主从复制的协议,模拟从数据库拉取增量Binlog日。canal通过Instance作为一个从数据库实例,客户端连接实例后有序消费增量的Binlog日志。有几点特别注意的是,一是canal的生产消费模型是一个带指针的数组,分别指向生产位置、消费位置和ack位置,来控制消费和生产的队列。二是Binlog的配置需要时row格式,canal的解析针对row格式做了适配。三是canal通过client竞争的方式保证消费时只有一个client消费,保证binlog的有序性。四是,生产端数据量大的时候canal会存在消费不及时的问题,存在一定延时性。性能分析时业务binlog入库到canal client拿到数据,基本可以达到10~20w的TPS。具体业务解析时肯定要低于这个,不过对于一般业务来说,已足够用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/85710?site
推荐阅读
相关标签
  

闽ICP备14008679号