赞
踩
本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。
假设我们将产品数据存储在MySQL中,同步到另外一个MySQL中
在下面的章节中,我们将介绍如何使用 Flink Mysql CDC 来实现它。本教程中的所有练习均在 Flink SQL CLI 中进行,整个过程使用标准 SQL 语法,无需任何 Java/Scala 代码,也无需安装 IDE。
架构概述如下:
需要准备安装好的MySQL数据库,具体MySQL数据怎么安装请查看笔者的博客Ubuntu数据库安装(mysql)
注意: 如果是其他操作系统请查看其他博客对应的数据库安装教程
使用以下命令启动 Flink SQL CLI:
./bin/sql-client.sh
我们应该看到 CLI 客户端的欢迎屏幕。
首先,每 3 秒启用一次检查点
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
编辑源数据库Flink Sql代码,如下所示:
CREATE TABLE products (
id INT NOT NULL,
name STRING,
description STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
'port' = '3306', #源数据库端口
'username' = 'root',#源数据库账号
'password' = '\*\*\*\*\*',#源数据库密码
'database-name' = 'mydb',#源数据库
'table-name' = 'products'#源数据库表
);
在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表
-- Flink SQL Flink SQL> CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.50.163', 'port' = '3306', 'username' = 'root', 'password' = '\*\*\*\*', 'database-name' = 'mydb', 'table-name' = 'products' );
编辑目标数据库Flink Sql代码,如下所示:
CREATE TABLE product ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( #引入的jdbc jar包驱动,没有引入会报错提示需要引入 flink-connector-jdbc 'connector' = 'jdbc', #目标数据库连接url地址,可以根据自己的具体设置,此处为笔者本机的。部分高版本的MySQL需要添加useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC 'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', #需要访问的数据库驱动 'driver' = 'com.mysql.cj.jdbc.Driver', #目标数据库账号 'username' = 'root', #目标据库密码 'password' = '\*\*\*', #目标数据库表 'table-name' = 'product' );
在Flink SQL 执行以下语句创建捕获更改数据的表与目标数据库表的映射关系
-- Flink SQL
Flink SQL> CREATE TABLE product (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
'table-name' = 'product'
);
使用Flink SQL将表product与 表查询products表写入目标MySQL。
-- Flink SQL
Flink SQL> insert into product select * from products;
具体操作步骤如下所示:
这是源数据库,操作添加数据,如下图所示:
目标数据库同步操作如下图
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
[外链图片转存中…(img-bjg180DR-1713035295230)]
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。