当前位置:   article > 正文

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

flink和cannel

背景介绍

本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的形式导入到 Kafka 中,继而被 Flink 消费的案例。

为了能够快速的验证整套流程的功能性,所有的组件都以单机的形式部署。如果手上的物理资源不足,可以将本文中的所有组件搭建在一台 4G 1U 的虚拟机环境中。

如果需要在生产环境中部署,建议将每一个组件替换成高可用的集群部署方案。

其中,我们单独创建了一套 Zookeeper 单节点环境,Flink、Kafka、Canal 等组件共用这个 Zookeeper 环境。

针对于所有需要 JRE 的组件,如 Flink,Kafka,Canal,Zookeeper,考虑到升级 JRE 可能会影响到其他的应用,我们选择每个组件独立使用自己的 JRE 环境。

本文分为两个部分,其中,前七小节主要介绍基础环境的搭建,最后一个小节介绍了数据是如何在各个组件中流通的。

数据的流动经过以下的组件:

  • MySQL 数据源生成 Binlog

  • Canal 读取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中

  • Flink 使用 flink-sql-connector-kafka API,消费 Kafka Topic 中的数据

  • Flink 在通过 flink-connector-jdbc,将数据写入到 TiDB

TiDB + Flink 的结构,支持开发与运行多种不同种类的应用程序。

目前主要的特性主要包括:

  • 批流一体化

  • 精密的状态管理

  • 事件时间支持

  • 精确的一次状态一致性保障

Flink 可以运行在包括 YARN、Mesos、Kubernetes 在内的多种资源管理框架上,还支持裸机集群上独立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同时也支持使用 TiUP 在裸机集群上独立部署。

TiDB + Flink 结构常见的几类应用如下:

  • 事件驱动型应用

  • 反欺诈

  • 异常检测

  • 基于规则的报警

  • 业务流程监控

  • 数据分析应用

  • 网络质量监控

  • 产品更新及试验评估分析

  • 事实数据即席分析

  • 大规模图分析

  • 数据管道应用

  • 电商实时查询索引构建

  • 电商持续 ETL

环境介绍

操作系统环境
  1. [root@r20 topology]# cat /etc/redhat-release
  2. CentOS Stream release 8
软件环境
机器分配

部署 TiDB Cluster

与传统的单机数据库相比,TiDB 具有以下优势:

  • 纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容

  • 支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接替换 MySQL

  • 默认支持高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务透明

  • 支持 ACID 事务,对于一些有强一致需求的场景友好,例如:银行转账

  • 具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景

在内核设计上,TiDB 分布式数据库将整体架构拆分成了多个模块,各模块之间互相通信,组成完整的 TiDB 系统。对应的架构图如下:

在本文中,我们只做最简单的功能测试,所以部署了一套单节点但副本的 TiDB,涉及到了以下的三个模块:

  • TiDB Server:SQL 层,对外暴露 MySQL 协议的连接 endpoint,负责接受客户端的连接,执行 SQL 解析和优化,最终生成分布式执行计划。

  • PD (Placement Driver) Server:整个 TiDB 集群的元信息管理模块,负责存储每个 TiKV 节点实时的数据分布情况和集群的整体拓扑结构,提供 TiDB Dashboard 管控界面,并为分布式事务分配事务 ID。

  • TiKV Server:负责存储数据,从外部看 TiKV 是一个分布式的提供事务的 Key-Value 存储引擎。

TiUP 部署模板文件
  1. # # Global variables are applied to all deployments and used as the default value of
  2. # # the deployments if a specific deployment value is missing.
  3. global:
  4.   user: "tidb"
  5.   ssh_port: 22
  6.   deploy_dir: "/opt/tidb-c1/"
  7.   data_dir: "/opt/tidb-c1/data/"
  8. # # Monitored variables are applied to all the machines.
  9. #monitored:
  10. #  node_exporter_port: 19100
  11. #  blackbox_exporter_port: 39115
  12. #  deploy_dir: "/opt/tidb-c3/monitored"
  13. #  data_dir: "/opt/tidb-c3/data/monitored"
  14. #  log_dir: "/opt/tidb-c3/log/monitored"
  15. # # Server configs are used to specify the runtime configuration of TiDB components.
  16. # # All configuration items can be found in TiDB docs:
  17. # # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
  18. # # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
  19. # # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
  20. # # All configuration items use points to represent the hierarchy, e.g:
  21. # #   readpool.storage.use-unified-pool
  22. # #
  23. # # You can overwrite this configuration via the instance-level `config` field.
  24. server_configs:
  25.   tidb:
  26.     log.slow-threshold: 300
  27.     binlog.enable: false
  28.     binlog.ignore-errorfalse
  29.     tikv-client.copr-cache.enable: true
  30.   tikv:
  31.     server.grpc-concurrency: 4
  32.     raftstore.apply-pool-size: 2
  33.     raftstore.store-pool-size: 2
  34.     rocksdb.max-sub-compactions: 1
  35.     storage.block-cache.capacity: "16GB"
  36.     readpool.unified.max-thread-count: 12
  37.     readpool.storage.use-unified-pool: false
  38.     readpool.coprocessor.use-unified-pool: true
  39.     raftdb.rate-bytes-per-sec: 0
  40.   pd:
  41.     schedule.leader-schedule-limit: 4
  42.     schedule.region-schedule-limit: 2048
  43.     schedule.replica-schedule-limit: 64
  44. pd_servers:
  45.   - host: 192.168.12.21
  46.     ssh_port: 22
  47.     name: "pd-2"
  48.     client_port: 12379
  49.     peer_port: 12380
  50.     deploy_dir: "/opt/tidb-c1/pd-12379"
  51.     data_dir: "/opt/tidb-c1/data/pd-12379"
  52.     log_dir: "/opt/tidb-c1/log/pd-12379"
  53.     numa_node: "0"
  54.     # # The following configs are used to overwrite the `server_configs.pd` values.
  55.     config:
  56.       schedule.max-merge-region-size: 20
  57.       schedule.max-merge-region-keys: 200000
  58. tidb_servers:
  59.   - host: 192.168.12.21
  60.     ssh_port: 22
  61.     port: 14000
  62.     status_port: 12080
  63.     deploy_dir: "/opt/tidb-c1/tidb-14000"
  64.     log_dir: "/opt/tidb-c1/log/tidb-14000"
  65.     numa_node: "0"
  66.     # # The following configs are used to overwrite the `server_configs.tidb` values.
  67.     config:
  68.       log.slow-query-file: tidb-slow-overwrited.log
  69.       tikv-client.copr-cache.enable: true
  70. tikv_servers:
  71.   - host: 192.168.12.21
  72.     ssh_port: 22
  73.     port: 12160
  74.     status_port: 12180
  75.     deploy_dir: "/opt/tidb-c1/tikv-12160"
  76.     data_dir: "/opt/tidb-c1/data/tikv-12160"
  77.     log_dir: "/opt/tidb-c1/log/tikv-12160"
  78.     numa_node: "0"
  79.     # # The following configs are used to overwrite the `server_configs.tikv` values.
  80.     config:
  81.       server.grpc-concurrency: 4
  82.       #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
  83. #monitoring_servers:
  84. #  - host: 192.168.12.21
  85. #    ssh_port: 22
  86. #    port: 19090
  87. #    deploy_dir: "/opt/tidb-c1/prometheus-19090"
  88. #    data_dir: "/opt/tidb-c1/data/prometheus-19090"
  89. #    log_dir: "/opt/tidb-c1/log/prometheus-19090"
  90. #grafana_servers:
  91. #  - host: 192.168.12.21
  92. #    port: 13000
  93. #    deploy_dir: "/opt/tidb-c1/grafana-13000"
  94. #alertmanager_servers:
  95. #  - host: 192.168.12.21
  96. #    ssh_port: 22
  97. #    web_port: 19093
  98. #    cluster_port: 19094
  99. #    deploy_dir: "/opt/tidb-c1/alertmanager-19093"
  100. #    data_dir: "/opt/tidb-c1/data/alertmanager-19093"
  101. #    log_dir: "/opt/tidb-c1/log/alertmanager-19093"
TiDB Cluster 环境

本文重点并非部署 TiDB Cluster,作为快速实验环境,只在一台机器上部署单副本的 TiDB Cluster 集群。不需要部署监控环境。

  1. [root@r20 topology]# tiup cluster display tidb-c1-v409
  2. Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409
  3. Cluster type:       tidb
  4. Cluster name:       tidb-c1-v409
  5. Cluster version:    v4.0.9
  6. SSH type:           builtin
  7. Dashboard URL:      http://192.168.12.21:12379/dashboard
  8. ID                   Role  Host           Ports        OS/Arch       Status   Data Dir                      Deploy Dir
  9. --                   ----  ----           -----        -------       ------   --------                      ----------
  10. 192.168.12.21:12379  pd    192.168.12.21  12379/12380  linux/x86_64  Up|L|UI  /opt/tidb-c1/data/pd-12379    /opt/tidb-c1/pd-12379
  11. 192.168.12.21:14000  tidb  192.168.12.21  14000/12080  linux/x86_64  Up       -                             /opt/tidb-c1/tidb-14000
  12. 192.168.12.21:12160  tikv  192.168.12.21  12160/12180  linux/x86_64  Up       /opt/tidb-c1/data/tikv-12160  /opt/tidb-c1/tikv-12160
  13. Total nodes: 4

创建用于测试的表

  1. mysql> show create table t1;
  2. +-------+-------------------------------------------------------------------------------------------------------------------------------+
  3. | Table | Create Table                                                                                                                  |
  4. +-------+-------------------------------------------------------------------------------------------------------------------------------+
  5. | t1    | CREATE TABLE `t1` (
  6.   `id` int(11) NOT NULL,
  7.   PRIMARY KEY (`id`)
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
  9. +-------+-------------------------------------------------------------------------------------------------------------------------------+
  10. 1 row in set (0.00 sec)

部署 Zookeeper 环境

在本实验中单独配置 Zookeeper 环境,为 Kafka 和 Flink 环境提供服务。

作为实验演示方案,只部署单机环境。

解压 Zookeeper 包
  1. [root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
  2. [root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
部署用于 Zookeeper 的 jre
  1. [root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
  2. [root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre

修改 /opt/zookeeper/bin/zkEnv.sh 文件,增加 JAVA_HOME 环境变量

  1. ## add bellowing env var in the head of zkEnv.sh
  2. JAVA_HOME=/opt/zookeeper/jre
创建 Zookeeper 的配置文件
  1. [root@r24 conf]# cat zoo.cfg | grep -v "#"
  2. tickTime=2000
  3. initLimit=10
  4. syncLimit=5
  5. dataDir=/opt/zookeeper/data
  6. clientPort=2181
启动 Zookeeper
[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start
检查 Zookeeper 的状态
  1. ## check zk status
  2. [root@r24 bin]# ./zkServer.sh status
  3. ZooKeeper JMX enabled by default
  4. Using config: /opt/zookeeper/bin/../conf/zoo.cfg
  5. Client port found: 2181. Client address: localhost. Client SSL: false.
  6. Mode: standalone
  7. ## check OS port status
  8. [root@r24 bin]# netstat -ntlp
  9. Active Internet connections (only servers)
  10. Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
  11. tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      942/sshd
  12. tcp6       0      0 :::2181                 :::*                    LISTEN      15062/java
  13. tcp6       0      0 :::8080                 :::*                    LISTEN      15062/java
  14. tcp6       0      0 :::22                   :::*                    LISTEN      942/sshd
  15. tcp6       0      0 :::44505                :::*                    LISTEN      15062/java
  16. ## use zkCli tool to check zk connection
  17. [root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
关于 Zookeeper 的建议

我个人有一个关于 Zookeeper 的不成熟的小建议:

Zookeeper 集群版本一定要开启网络监控。

特别是要关注 system metrics 里面的 network bandwidth。

部署 Kafka

Kafka 是一个分布式流处理平台,主要应用于两大类的应用中:

  • 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue)

  • 构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流处理,通过kafka stream topic和topic之间内部进行变化)

Kafka 有四个核心的 API:

  • The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。

  • The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。

  • The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。

  • The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

在本实验中只做功能性验证,只搭建一个单机版的 Kafka 环境。

下载并解压 Kafka
  1. [root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
  2. [root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka

部署用于 Kafka 的 jre

  1. [root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz
  2. [root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre

修改 Kafka 的 jre 环境变量

  1. [root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh
  2. ## add bellowing line in the head of kafka-run-class.sh
  3. JAVA_HOME=/opt/kafka/jre
修改 Kafka 配置文件
  1. 修改 Kafka 配置文件 /opt/kafka/config/server.properties
  2. ## change bellowing variable in /opt/kafka/config/server.properties
  3. broker.id=0
  4. listeners=PLAINTEXT://192.168.12.22:9092
  5. log.dirs=/opt/kafka/logs
  6. zookeeper.connect=i192.168.12.24:2181
启动 Kafka
[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
查看 Kafka 的版本信息
  1. Kafka 并没有提供 --version 的 optional 来查看 Kafka 的版本信息。
  2. [root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
  3. -rw-r--r-- 1 root root  4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar
  4. -rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc
  5. -rw-r--r-- 1 root root    41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar
  6. -rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc
  7. -rw-r--r-- 1 root root   892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar
  8. -rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc
  9. ... ...

其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。

部署 Flink

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

支持高吞吐、低延迟、高性能的分布式处理框架Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

本实验只做功能性测试,仅部署单机 Flink 环境。

下载并分发 Flink
  1. [root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
  2. [root@r23 soft]# mv flink-1.12.1 /opt/flink
部署 Flink 的 jre
  1. [root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
  2. [root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
添加 Flink 需要的 lib
  1. Flink 消费 Kafka 数据,需要 flink-sql-connector-kafka 包
  2. Flink 链接 MySQL/TiDB,需要 flink-connector-jdbc 包
  3. [root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
  4. [root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
修改 Flink 配置文件
  1. ## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml
  2. jobmanager.rpc.address: 192.168.12.23
  3. env.java.home: /opt/flink/jre
启动 Flink
  1. [root@r23 ~]# /opt/flink/bin/start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host r23.
  4. Starting taskexecutor daemon on host r23.
查看 Flink GUI

部署 MySQL

解压 MySQL package
  1. [root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
  2. [root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/
创建 MySQL Service 文件
  1. [root@r25 ~]# touch /opt/mysql/support-files/mysqld.service
  2. [root@r25 support-files]# cat mysqld.service
  3. [Unit]
  4. Description=MySQL 8.0 database server
  5. After=syslog.target
  6. After=network.target
  7. [Service]
  8. Type=simple
  9. User=mysql
  10. Group=mysql
  11. #ExecStartPre=/usr/libexec/mysql-check-socket
  12. #ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n
  13. # Note: we set --basedir to prevent probes that might trigger SELinux alarms,
  14. # per bug #547485
  15. ExecStart=/opt/mysql/bin/mysqld_safe
  16. #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade
  17. #ExecStopPost=/opt/mysql/bin/mysql-wait-stop
  18. # Give a reasonable amount of time for the server to start up/shut down
  19. TimeoutSec=300
  20. # Place temp files in a secure directory, not /tmp
  21. PrivateTmp=true
  22. Restart=on-failure
  23. RestartPreventExitStatus=1
  24. # Sets open_files_limit
  25. LimitNOFILE = 10000
  26. # Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.
  27. Environment=MYSQLD_PARENT_PID=1
  28. [Install]
  29. WantedBy=multi-user.target
  30. ## copy mysqld.service to /usr/lib/systemd/system/
  31. [root@r25 support-files]# cp mysqld.service  /usr/lib/systemd/system/
创建 my.cnf 文件
  1. [root@r34 opt]# cat /etc/my.cnf
  2. [mysqld]
  3. port=3306
  4. basedir=/opt/mysql
  5. datadir=/opt/mysql/data
  6. socket=/opt/mysql/data/mysql.socket
  7. max_connections = 100
  8. default-storage-engine = InnoDB
  9. character-set-server=utf8
  10. log-error = /opt/mysql/log/error.log
  11. slow_query_log = 1
  12. long-query-time = 30
  13. slow_query_log_file = /opt/mysql/log/show.log
  14. min_examined_row_limit = 1000
  15. log-slow-slave-statements
  16. log-queries-not-using-indexes
  17. #skip-grant-tables
初始化并启动 MySQL
  1. [root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console
  2. [root@r25 ~]# chown -R mysql:mysql /opt/mysql
  3. [root@r25 ~]# systemctl start mysqld
  4. ## check mysql temp passord from /opt/mysql/log/error.log
  5. 2021-02-24T02:45:47.3164066 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-
创建一个新的 MySQL 用户用以连接 Canal
  1. ## change mysql temp password firstly
  2. mysql> alter user 'root'@'localhost' identified by 'mysql';
  3. Query OK, 0 rows affected (0.00 sec)
  4. ## create a management user 'root'@'%'
  5. mysql> create user 'root'@'%' identified by 'mysql';
  6. Query OK, 0 rows affected (0.01 sec)
  7. mysql> grant all privileges on *.* to 'root'@'%';
  8. Query OK, 0 rows affected (0.00 sec)
  9. ## create a canal replication user 'canal'@'%'
  10. mysql> create user 'canal'@'%' identified by 'canal';
  11. Query OK, 0 rows affected (0.01 sec)
  12. mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';
  13. Query OK, 0 rows affected (0.00 sec)
  14. mysql> flush privileges;
  15. Query OK, 0 rows affected (0.00 sec)
在 MySQL 中创建用于测试的表
  1. mysql> show create table test.t2;
  2. +-------+----------------------------------------------------------------------------------+
  3. | Table | Create Table                                                                     |
  4. +-------+----------------------------------------------------------------------------------+
  5. | t2    | CREATE TABLE `t2` (
  6.   `id` int DEFAULT NULL
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
  8. +-------+----------------------------------------------------------------------------------+
  9. 1 row in set (0.00 sec)

部署 Canal

Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。

从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

解压 Canal 包
[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal
部署 Canal 的 jre
  1. [root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz
  2. [root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
  3. ## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh 
  4. JAVA=/opt/canal/jre/bin/java
修改 Canal 的配置文件
  1. 修改 /opt/canal/conf/canal.properties 配置文件
  2. ## modify bellowing configuration
  3. canal.zkServers =192.168.12.24:2181
  4. canal.serverMode = kafka
  5. canal.destinations = example        ## 需要在 /opt/canal/conf 目录下创建一个 example 文件夹,用于存放 destination 的配置
  6. canal.mq.servers = 192.168.12.22:9092
  7. 修改 /opt/canal/conf/example/instance.properties 配置文件
  8. ## modify bellowing configuration
  9. canal.instance.master.address=192.168.12.25:3306
  10. canal.instance.dbUsername=canal
  11. canal.instance.dbPassword=canal
  12. canal.instance.filter.regex=.*\\..*                    ## 过滤数据库的表
  13. canal.mq.topic=canal-kafka

配置数据流向

MySQL Binlog -> Canal -> Kafka 通路
查看 MySQL Binlog 信息

查看 MySQL Binlog 信息,确保 Binlog 是正常的

  1. mysql> show master status;
  2. +---------------+----------+--------------+------------------+-------------------+
  3. | File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
  4. +---------------+----------+--------------+------------------+-------------------+
  5. | binlog.000001 |     2888 |              |                  |                   |
  6. +---------------+----------+--------------+------------------+-------------------+
  7. 1 row in set (0.00 sec)
在 Kafka 中创建一个 Topic

在 Kafka 中创建一个 Topic canal-kafka,这个Topic 的名字要与 Canal 配置文件 /opt/canal/conf/example/instance.properties 中的 canal.mq.topic=canal-kafka 对应:

  1. [root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \
  2. > --zookeeper 192.168.12.24:2181 \
  3. > --config max.message.bytes=12800000 \
  4. > --config flush.messages=1 \
  5. > --replication-factor 1 \
  6. > --partitions 1 \
  7. > --topic canal-kafka
  8. Created topic canal-kafka.
  9. [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)
  10. [2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
  11. [2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
  12. [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)
  13. [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)

查看 Kafka 中所有的 Topic:

  1. [root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181
  2. __consumer_offsets
  3. canal-kafka
  4. ticdc-test

查看 Kafka 中 Topic ticdc-test 的信息:

  1. [root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181  --topic canal-kafka
  2. Topic: ticdc-test       PartitionCount: 1       ReplicationFactor: 1    Configs: max.message.bytes=12800000,flush.messages=1
  3.         Topic: ticdc-test       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
8.1.3 启动 Canal

在启动 Canal 之前,需要在 Canal 节点上查看一下端口的情况:

  1. ## check MySQL 3306 port
  2. ## canal.instance.master.address=192.168.12.25:3306
  3. [root@r26 bin]# telnet 192.168.12.25 3306
  4. ## check Kafka 9092 port
  5. ## canal.mq.servers = 192.168.12.22:9092
  6. [root@r26 bin]# telnet 192.168.12.22 9092
  7. ## check zookeeper 2181 port
  8. ## canal.zkServers = 192.168.12.24:2181
  9. [root@r26 bin]# telnet 192.168.12.24 2181

启动 Canal:

  1. [root@r26 bin]# /opt/canal/bin/startup.sh
  2. cd to /opt/canal/bin for workaround relative path
  3. LOG CONFIGURATION : /opt/canal/bin/../conf/logback.xml
  4. canal conf : /opt/canal/bin/../conf/canal.properties
  5. CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar:
  6. cd to /opt/canal/bin for continue
查看 Canal 日志

查看 /opt/canal/logs/example/example.log

  1. 2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
  2. 2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
  3. 2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump
查看 Kafka 中 consumer 信息

在 MySQL 中插入一条测试信息:

  1. mysql> insert into t2 values(1);
  2. Query OK, 1 row affected (0.00 sec)

查看 consumer 的信息,已经有了刚才插入的测试数据:

  1. /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning
  2. {"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"}
  3. {"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"}
  4. {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}
Kafka -> Flink 通路

在 Flink 中创建 t2 表,connector 类型为 kafka

  1. ## create a test table t2 in Flink
  2. Flink SQL> create table t2(id int)
  3. > WITH (
  4. >  'connector' = 'kafka',
  5. >  'topic' = 'canal-kafka',
  6. >  'properties.bootstrap.servers' = '192.168.12.22:9092',
  7. >  'properties.group.id' = 'canal-kafka-consumer-group',
  8. >  'format' = 'canal-json',
  9. >  'scan.startup.mode' = 'latest-offset'
  10. > );
  11. Flink SQL> select * from t1;

在 MySQL 中在插入一条测试数据:

  1. mysql> insert into test.t2 values(2);
  2. Query OK, 1 row affected (0.00 sec)

从 Flink 中可以实时同步数据:

  1. Flink SQL> select * from t1;
  2.  Refresh: 1 s                                                                                                             Page: Last of 1                                                                                                     Updated: 02:49:27.366
  3.                         id
  4.                          2
Flink -> TiDB 通路
在 下游的 TiDB 中创建用于测试的表
  1. [root@r20 soft]# mysql -uroot -P14000 -hr21
  2. mysql> create table t3 (id int);
  3. Query OK, 0 rows affected (0.31 sec)
在 Flink 中创建测试表
  1. Flink SQL> CREATE TABLE t3 (
  2. >     id int
  3. > ) with (
  4. >     'connector' = 'jdbc',
  5. >     'url' = 'jdbc:mysql://192.168.12.21:14000/test',
  6. >     'table-name' = 't3',
  7. >     'username' = 'root',
  8. >     'password' = 'mysql'
  9. > );
  10. Flink SQL> insert into t3 values(3);
  11. [INFO] Submitting SQL update statement to the cluster...
  12. [INFO] Table update statement has been successfully submitted to the cluster:
  13. Job ID: a0827487030db177ee7e5c8575ef714e
在下游 TiDB 中查看插入的数据
  1. mysql> select * from test.t3;
  2. +------+
  3. | id   |
  4. +------+
  5. |    3 |
  6. +------+
  7. 1 row in set (0.00 sec)

本文转自:https://asktug.com/t/topic/68731 作者:懂的都懂

end

  1. Flink 从入门到精通 系列文章
  2. 基于 Apache Flink 的实时监控告警系统关于数据中台的深度思考与总结(干干货)日志收集Agent,阴暗潮湿的地底世界

  1. 公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug ????
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/723335
推荐阅读
相关标签
  

闽ICP备14008679号