当前位置:   article > 正文

Flink CDC读取MySQL的例子_flinkcdc读取mysql

flinkcdc读取mysql

前面转载了Flink CDC的机制文章,原理看起来还是比较好理解的,那么接下来很多人肯定都跃跃欲试了。

这篇文章分享一下MySQL作为源端,使用Flink SQL如何读取MySQL数据

注意:如果经验有限,在进行所有的软件安装配置操作之前,请先仔细阅读一遍Flink CDC官方文档,目的主要是从文档中获取到Flink CDC与Flink版本对照关系及其他软件的支持版本

本文章使用的软件版本如下:

  • flink-1.13.3
  • flink-sql-connector-mysql-cdc-2.0.2.jar
  • 10.3.29-MariaDB-log
  • openjdk version "11.0.11" 2021-04-20

1.MySQL配置

在进行配置之前,首先你需要自行安装MySQL,具体步骤这里不展开说了,大家如果安装MySQL有困难还请自己去百度一下。

当前Flink CDC官方宣称支持的MySQL版本信息如下:

  • Database: 5.7, 8.0.x
  • JDBC Driver: 8.0.16

我这边测试过程中使用的是MariaDB 10,也是支持的。

1.1 启用MySQL binlog

修改my.cnf文件,增加如下信息:

  1. server_id=1
  2. log_bin=mysql-bin
  3. binlog_format=ROW
  4. expire_logs_days=30
  5. binlog_do_db=db_a
  6. binlog_do_db=db_b

配置项的解释如下:

  • server_id:MySQL5.7及以上版本开启binlog必须要配置这个选项。对于MySQL集群,不同节点的server_id必须不同。对于单实例部署则没有要求。
  • log_bin:指定binlog文件名和储存位置。如果不指定路径,默认位置为/var/lib/mysql/。
  • binlog_format:binlog格式。有3个值可以选择:ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。STATEMENT:记录修改数据的SQL,日志量较小。MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。
  • expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。
  • binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。

配置文件修改完毕后保存并重启MySQL。然后进入MySQL命令行,验证是否已启用binlog:

  1. SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
  2. FROM information_schema.global_variables WHERE variable_name='log_bin';

返回结果如果显示ON则表示binlog启用。

1.2 初始化MySQL表

在MySQL中创建测试数据库和测试表demo.products,语句如下:

  1. create database demo;
  2. use demo;
  3. CREATE TABLE `products` (
  4. `id` int(11) NOT NULL,
  5. `name` varchar(20) DEFAULT NULL,
  6. `description` varchar(20) DEFAULT NULL,
  7. `weight` decimal(10,3) DEFAULT NULL,
  8. PRIMARY KEY (`id`)
  9. ) ;

1.3 向MySQL表中插入测试数据

  1. INSERT INTO demo.products
  2. (id, name, description, weight)
  3. VALUES(1, 'a', 'a', 20.000);
  4. INSERT INTO demo.products
  5. (id, name, description, weight)
  6. VALUES(2, 'b', 'b', 30.000);
  7. INSERT INTO demo.products
  8. (id, name, description, weight)
  9. VALUES(3, 'c', 'c', 40.000);
  10. INSERT INTO demo.products
  11. (id, name, description, weight)
  12. VALUES(4, 'd', 'd', 50.000);

2.Flink配置

2.1 Flink下载

Flink官方网站下载flink-1.13.3软件包,我这里下载的版本如下:

  • flink-1.13.3-bin-scala_2.11.tgz

将下载的软件包解压缩到你习惯使用的软件目录,我这里就直接放在Downloads目录下了。

2.2 下载mysql-cdc jar

到flink cdc的github release页面下载最新的jar包flink-sql-connector-mysql-cdc-2.0.2.jar

将下载好的jar包,放到2.1步解压后flink文件夹的lib目录下。

2.3 启动Flink

进入flink/bin目录,执行[./start-cluster.sh]启动flink测试环境。

3.在Flink SQL中读取MySQL

进入flink/bin目录,执行[./sql-client.sh]启动Flink SQL。

3.1 创建Flink SQL 表

  1. CREATE TABLE mysql_binlog (
  2. id INT NOT NULL,
  3. name STRING,
  4. description STRING,
  5. weight DECIMAL(10,3),
  6. PRIMARY KEY(id) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = '192.168.0.200',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = 'passwd',
  13. 'database-name' = 'demo',
  14. 'table-name' = 'products'
  15. );

3.2 执行查询

SELECT id, UPPER(name), description, weight FROM mysql_binlog;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/1005219
推荐阅读
相关标签
  

闽ICP备14008679号