赞
踩
提示:以下是本篇文章正文内容,下面案例可供参考
1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历史原因,类似刚开始的商业模式不清晰,导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的,业务运营也是分开,后来又合并成了一个大块业务。
2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中,比如业务的数据存储再Mysql的数据中,后来为了方便检索需要写入到ES,或者为了缓存需要写入到Redis,或者是Mysql分表的数据合并写入到Doris中。
3、数据仓库的场景。比如将表里的数据实时写入到DWS数据仓库的宽表中。
4、应急场景。如果不采专用CDC的方案,那么要达到实时查询的效果,只能在BFF层的代码调用多个中心层的查询API,然后再BFF层做各种聚合,运算。这种方式开发效率低下,万一有的中心层没有提供合适的查询API,临时开发的话,会让开发进度不可控。
总之,不管是数据多写、还是多表合并、还是建立数据仓库,都属于数据同步任务。
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。
目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。
官网地址:官网FlinkCDC
官方定义:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据FlinkCDC官方给出的定义,FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。
CDC的原理是,当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动。
1、FlinkCDC 提供了对 Debezium 连接器的封装和集成,简化了配置和使用的过程,并提供了更高级的 API 和功能,例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现,将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC,您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table,并进行实时的数据处理、转换和分析。
2、Flink的DataStream和SQL比较成熟和易用
3、Flink支持状态后端(State Backends),允许存储海量的数据状态
4、Flink有更好的生态,更多的Source和Sink的支持
背景:win10电脑安装vmware(虚拟化)软件,虚拟机中安装
linux节点一个,flink,mysql(yum默认安装的最新版本:8.0.37),java环境(此处安装java环境略,网上有)
###在linux下载flink包 [root@slave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz ### 加压包到当前目录下 [root@slave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz [root@slave2 ~]# cd flink-1.16.3 [root@slave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar 修改flink 文件 [root@slave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改内容如下(如果不修改则win10本地电脑无法访问flink web UI,这里浪费很多时间): taskmanager.host: localhost rest.bind-address: 0.0.0.0 ###启动flink [root@slave2 flink-1.16.3]# bin/start-cluster.sh ###启动客户端 root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded
安装mysql遇到很多小问题
[root@slave2 ~]# yum -y install mysql-community-server
“MySQL 8.0 Community Server” 的 GPG 密钥已安装,但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。
失败的软件包是:mysql-community-libs-8.0.37-1.el7.x86_64
###用于跳过GPG签名检查 可以安装成功 [root@slave2 ~]# yum -y install mysql-server --nogpgcheck ###验证myqsl是否可用 [root@slave2 ~]# systemctl start mysqld [root@slave2 ~]# mysql -u root -p Enter password: Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 63 Server version: 8.0.37 MySQL Community Server - GPL Copyright (c) 2000, 2024, Oracle and/or its affiliates. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> ### 配置开启binlog [root@slave2 ~]# vim /etc/my.cnf log_bin=mysql_bin binlog-format=Row server-id=1 ###重启mysql [root@slave2 ~]# systemctl restart mysqld ###重新登陆mysql并查看binlog开启情况 mysql> SHOW VARIABLES LIKE 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.01 sec)
mysql创建新用户等
###创建新用户时发现一直报错。查看mysql建密码时要求比较高(新版本mysql对密码要求高) mysql> SHOW VARIABLES LIKE 'validate_password%'; +-------------------------------------------------+--------+ | Variable_name | Value | +-------------------------------------------------+--------+ | validate_password.changed_characters_percentage | 0 | | validate_password.check_user_name | ON | | validate_password.dictionary_file | | | validate_password.length | 10 | | validate_password.mixed_case_count | 1 | | validate_password.number_count | 1 | | validate_password.policy | MEDIUM | | validate_password.special_char_count | 1 | +-------------------------------------------------+--------+ 8 rows in set (0.00 sec) ###修改密码要求(在工作生产环境不建议这么做) mysql> SET GLOBAL validate_password.length = 3; Query OK, 0 rows affected (0.03 sec) mysql> SET GLOBAL validate_password.policy = LOW; Query OK, 0 rows affected (0.00 sec) ###再次查看对新建用户密码要求 mysql> SHOW VARIABLES LIKE 'validate_password%'; +-------------------------------------------------+-------+ | Variable_name | Value | +-------------------------------------------------+-------+ | validate_password.changed_characters_percentage | 0 | | validate_password.check_user_name | ON | | validate_password.dictionary_file | | | validate_password.length | 4 | | validate_password.mixed_case_count | 1 | | validate_password.number_count | 1 | | validate_password.policy | LOW | | validate_password.special_char_count | 1 | +-------------------------------------------------+-------+ 8 rows in set (0.00 sec) ###新建用户:root1234 mysql> CREATE USER 'root1234'@'localhost' IDENTIFIED BY 'root1234'; Query OK, 0 rows affected (0.01 sec) mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root1234'@'localhost'; Query OK, 0 rows affected (0.02 sec) mysql> GRANT SELECT ON *.* TO 'root1234'@'localhost'; Query OK, 0 rows affected (0.01 sec) mysql> SELECT User, Host FROM mysql.user; +------------------+-----------+ | User | Host | +------------------+-----------+ | mysql.infoschema | localhost | | mysql.session | localhost | | mysql.sys | localhost | | root | localhost | | root1234 | localhost | +------------------+-----------+ 5 rows in set (0.00 sec) mysql> FLUSH PRIVILEGES; Query OK, 0 rows affected (0.00 sec)
建mysql库表(数据源头库表)
Flink SQL>
select * from test_flink_cdc5;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc]
###mysql 默认只有这4个库(当时直接用默认库mysql建表导致flink 报一些上面奇诡的错) mysql> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | mysql | | performance_schema | | sys | +--------------------+ ###新建一个库:test mysql> CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; Query OK, 1 row affected (0.02 sec) mysql> use test; Database changed ###建表 mysql> CREATE TABLE `test_cdc` ( -> `id` int NOT NULL AUTO_INCREMENT, -> `name` varchar(255) DEFAULT NULL, -> PRIMARY KEY (`id`) -> ) ENGINE=InnoDB ; Query OK, 0 rows affected (0.04 sec) mysql> show tables; +----------------+ | Tables_in_test | +----------------+ | test_cdc | +----------------+ 1 row in set (0.00 sec)
##开始验证
##mysql 使用上面创建的用户密码登录mysql [root@slave2 ~]# mysql -u root -p'root1234' mysql> use test ##flink登录 [root@slave2 flink-1.16.3]# bin/stop-cluster.sh [root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded ##在flink sql中定义mysql源 CREATE TABLE test_flink_cdc ( id INT, name STRING, primary key(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username'='root1234', 'password'='root1234', 'database-name'='test', 'table-name'='test_cdc' ); ###查询flink 接收到的binlog数据 Flink SQL> select * from test_flink_cdc; ###到mysql sql界面向test_cdc表插入数据 mysql> INSERT INTO test_cdc VALUES (001, 'test01'); Query OK, 1 row affected (0.02 sec) mysql> INSERT INTO test_cdc VALUES (002, 'test02'); Query OK, 1 row affected (0.01 sec) mysql> INSERT INTO test_cdc VALUES (003, 'test03'); Query OK, 1 row affected (0.04 sec) mysql> INSERT INTO test_cdc VALUES (004, 'test04'); Query OK, 1 row affected (0.01 sec) mysql> INSERT INTO test_cdc VALUES (005, 'test05'); Query OK, 1 row affected (0.01 sec) mysql> INSERT INTO test_cdc VALUES (006, 'test06'); Query OK, 1 row affected (0.00 sec)
向mysql表中插入数据
flink sql这时可以接到binlog数据
查看flink UI job情况
小结:当flink可以拿到mysql binlog源头数据,下面就好做了,根据自己的业务处理sink到任何数据库或组件中(例如sink到mysql,hbase,hive,pg,kafka等等),后面sink就不演示了。
下载链接:
1.mysql jdbc jar包驱动下载
2.flink cdc驱动下载
3.flink下载
datastream 比较灵活简单,下面是举例代码片段(datastream的CDC比flink sql还简单,打个jar包在flink web UI界面上传运行即可,此处不做举例)
public class MySqlSourceExample { public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("yourHostname") .port(yourPort) .databaseList("yourDatabaseName") // set captured database .tableList("yourDatabaseName.yourTableName") // set captured table .username("yourUsername") .password("yourPassword") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } }
logtail和SLS(阿里云的组件,类似kafka,但要比kafka等功能强大)
1.使用flink cdc不适合直观观察binlog的数据(例如脏数据,数据断流,不能直观看到最近的binglog情况,表的更新频率不高等造成的困扰)。
2.使用阿里云SLS收集mysql binlog数据,将数据保存近1个月(保持时间可设置),同时可以在logstore查看数据结构样式(sls支持sql语法查询日志数据),方便后续flink代码开发,也方便flink sql和datastream代码debug。
3.使用flink cdc做数据质量监控比较后知后觉(只能监控已表数据),我这边做法是直接监控sls原始数据质量,发现有数据质量问题会报出来。sls也支持实时断流提醒。
4.一般对于简单的不太重要的业务适合使用flink cdc,这样开发快,数据流不用咋校验。对于复杂数据或复杂业务或重要数据,需要观察binlog数据结果(不信任上游其他部门数据)还是使用类似sls比较好。方便查询数据变化与汇总,方便做数据报警等。
5.总之还是要根据自己的业务场景和自己公司现有技术组件组合着使用比较好。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。