赞
踩
Flink CDC + Hudi实践
3、全局读取锁(FLUSH TABLES WITH READ LOCK)
4、为每个作业设置一个differnet SERVER ID
2.使用Flink cdc mysql连接器创建flinkSQL映射表
6.FlinkCDC会根据字段名映射mysql的源表,字段可以不一一对应.
一、依赖关系
为了设置MySQL CDC连接器,下表提供了使用构建自动化工具(例如Maven或SBT)和带有SQL JAR捆绑包的SQL Client的两个项目的依赖项信息。
1、Maven依赖
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.0.0</version>
- </dependency>
2、SQL客户端JAR
下载flink-sql-connector-mysql-cdc-2.0.0.jar并将其放在下<FLINK_HOME>/lib/。
二、设置MySQL服务器
您必须定义一个对Debezium MySQL连接器监视的所有数据库具有适当权限的MySQL用户。
1、创建MySQL用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
2、向用户授予所需的权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
3、最终确定用户的权限:
mysql> FLUSH PRIVILEGES;
查看有关权限说明的更多信息: https://debezium.io/documentation/reference/1.2/connectors/mysql.html#_permissions_explained
1、MySQL CDC源代码如何工作
启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入。然后,它读取当前binlog位置以及数据库和表的schema。之后,将释放 全局读取锁。然后,它扫描数据库表并从先前记录的位置读取binlog。Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复。因此,它保证了仅一次的语义。
2、向MySQL用户授予RELOAD权限
如果未授予MySQL用户RELOAD权限,则MySQL CDC源将改为使用表级锁,并使用此方法执行快照。这会阻止写入更长的时间。
3、全局读取锁(FLUSH TABLES WITH READ LOCK)
全局读取锁 在读取binlog位置和schema期间保持。这可能需要几秒钟,具体取决于表的数量。全局读取锁定会阻止写入,因此它仍然可能影响在线业务。
如果要跳过读取锁,并且可以容忍至少一次语义,则可以添加'debezium.snapshot.locking.mode' = 'none'选项以跳过锁。
4、为每个作业设置一个differnet SERVER ID
每个用于读取binlog的MySQL数据库客户端都应具有唯一的ID,称为server id。MySQL服务器将使用此ID维护网络连接和binlog位置。如果不同的作业共享相同的server id,则可能导致从错误的binlog位置进行读取。
提示:默认情况下,启动TaskManager时,server id是随机的。如果TaskManager失败,则再次启动时,它可能具有不同的server id。但这不应该经常发生(作业异常不会重新启动TaskManager),也不会对MySQL服务器造成太大影响。
因此,建议为每个作业设置不同的server id ,例如:
重点:Mysq的binlog 可以说是针对库级别,所以相同的server id去拉一个库里的不同表或者相同表可能会造成数据丢失。所以建议设置server id。(我也有在社区邮件中提问Jark大佬:传送地址)
5、扫描数据库表期间无法执行检查点
在扫描表期间,由于没有可恢复的位置,因此我们无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:
- execution.checkpointing.interval: 10min
- execution.checkpointing.tolerable-failed-checkpoints: 100
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 2147483647
6、设置MySQL会话超时
为大型数据库创建初始一致的快照时,在读取表时,您建立的连接可能会超时。您可以通过在MySQL配置文件中配置Interactive_timeout和wait_timeout来防止此行为。
四.Flink cdc 数据入湖(Hudi)
1.创建mysql源表
- -- MySQL
- /*Table structure for table `order_info` */
- DROP TABLE IF EXISTS `order_info`;
- CREATE TABLE `order_info` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
- `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
- `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
- `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
- `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',
- `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
- `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
- `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
- `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
- `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
- `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
- `create_time` datetime DEFAULT NULL COMMENT '创建时间',
- `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
- `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
- `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
- `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
- `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
- `province_id` int(20) DEFAULT NULL COMMENT '地区',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
- -- ----------------------------
- -- Records of order_info
- -- ----------------------------
- INSERT INTO `order_info`
- VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
- INSERT INTO `order_info`
- VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
- INSERT INTO `order_info`
- VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
- INSERT INTO `order_info`
- VALUES (480, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
- INSERT INTO `order_info`
- VALUES (481, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
- INSERT INTO `order_info`
- VALUES (482, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
- INSERT INTO `order_info`
- VALUES (483, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
- INSERT INTO `order_info`
- VALUES (484, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
- INSERT INTO `order_info`
- VALUES (485, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
- INSERT INTO `order_info`
- VALUES (486, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
- INSERT INTO `order_info`
- VALUES (487, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
- INSERT INTO `order_info`
- VALUES (488, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
- INSERT INTO `order_info`
- VALUES (489, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
- INSERT INTO `order_info`
- VALUES (490, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
- INSERT INTO `order_info`
- VALUES (491, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
-
- /*Table structure for table `order_detail` */
- CREATE TABLE `order_detail` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
- `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
- `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
- `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
- `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
- `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
- `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
- `create_time` datetime DEFAULT NULL COMMENT '创建时间',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
-
- -- ----------------------------
- -- Records of order_detail
- -- ----------------------------
- INSERT INTO `order_detail`
- VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
- INSERT INTO `order_detail`
- VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
- INSERT INTO `order_detail`
- VALUES (1331, 478, 4, '小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
- INSERT INTO `order_detail`
- VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
- INSERT INTO `order_detail`
- VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');
2.使用Flink cdc mysql连接器创建flinkSQL映射表
- CREATE TABLE order_info(
- id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
- user_id BIGINT,
- create_time TIMESTAMP(0),
- operate_time TIMESTAMP(0),
- province_id INT,
- order_status STRING,
- total_amount DECIMAL(10, 5)
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '192.168.88.161',
- 'port' = '3306',
- 'username' = 'user',
- 'password' = 'password',
- 'database-name' = 'bigdata',
- 'table-name' = 'order_info',
- 'server-id' = '5401',
- 'scan.incremental.snapshot.enabled'='false'
- );
-
- set sql-client.execution.result-mode=tableau;
-
- select * from order_info;
-
- CREATE TABLE order_detail(
- id BIGINT,
- order_id BIGINT,
- sku_id BIGINT,
- sku_name STRING,
- sku_num BIGINT,
- order_price DECIMAL(10, 5),
- create_time TIMESTAMP(0)
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '192.168.88.161',
- 'port' = '3306',
- 'username' = 'user',
- 'password' = 'password',
- 'database-name' = 'bigdata',
- 'table-name' = 'order_detail',
- 'server-id' = '5402',
- 'scan.incremental.snapshot.enabled'='false'
- );
-
- select * from order_detail;
3.创建FlinkSQL Hudi连接器创建hudi表
- CREATE TABLE order_info_hudi(
- id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
- user_id BIGINT,
- create_time TIMESTAMP(0),
- operate_time TIMESTAMP(0),
- province_id INT,
- order_status STRING,
- total_amount DECIMAL(10, 5)
- ) WITH (
- 'connector' = 'hudi',
- 'path' = 'hdfs://node1:8020/hudi/order_info_hudi',
- 'table.type' = 'MERGE_ON_READ',
- 'changelog.enabled' = 'true',
- 'write.precombine.field' = 'create_time',
- 'compaction.async.enabled' = 'false'
- );
-
-
- CREATE TABLE order_detail_hudi(
- id BIGINT,
- order_id BIGINT,
- sku_id BIGINT,
- sku_name STRING,
- sku_num BIGINT,
- order_price DECIMAL(10, 5),
- create_time TIMESTAMP(0)
- ) WITH (
- 'connector' = 'hudi',
- 'path' = 'hdfs://node1:8020/hudi/order_detail_hudi',
- 'table.type' = 'MERGE_ON_READ',
- 'changelog.enabled' = 'true',
- 'write.precombine.field' = 'create_time',
- 'hoodie.datasource.write.recordkey.field' = 'id',
- 'compaction.async.enabled' = 'false'
- );
4.将数据从CDC表插入hudi表
- insert into order_info_hudi select * from order_info;
-
- select * from hoodie_table/*+ OPTIONS('read.streaming.enabled'='true')*/;
-
- insert into order_detail_hudi select * from order_detail;
-
- select * from order_detail;
注意事项:
同时可以查看HDFS里的Hudi数据路径,这里需要等Flink 5次checkpoint(默认配置可修改)之后才能查看到这些目录,一开始只有.hoodie一个文件夹
5.直接用hudi表进行join操作
- SELECT
- od.id,
- oi.id order_id,
- oi.user_id,
- oi.province_id,
- od.sku_id,
- od.sku_name,
- od.sku_num,
- od.order_price,
- oi.create_time,
- oi.operate_time
- FROM
- (
- SELECT *
- FROM order_info_hudi
- WHERE
- order_status = '2'-- 已支付
- ) oi
- JOIN
- (
- SELECT *
- FROM order_detail_hudi
- ) od
- ON oi.id = od.order_id;
6.FlinkCDC会根据字段名映射mysql的源表,字段可以不一一对应.
- CREATE TABLE order_info(
- id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
- user_id BIGINT,
- create_time TIMESTAMP(0),
- total_amount DECIMAL(10, 5)
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '192.168.88.161',
- 'port' = '3306',
- 'username' = 'user',
- 'password' = 'password',
- 'database-name' = 'bigdata',
- 'table-name' = 'order_info',
- 'server-id' = '5401',
- 'scan.incremental.snapshot.enabled'='false'
- );
查询的节结果是这样的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。