赞
踩
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将为您详细介绍如何使用自定义表值函数(UDTF),并将处理后的数据存入 MySQL 中。
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库。
创建 MySQL 表
- -- 建表语句,用于向 Source 提供数据
- CREATE TABLE `udtf_input` (
- `id` int(10) NOT NULL,
- `name` varchar(20) DEFAULT '',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8
-
-
- -- 插入数据
- INSERT INTO `udtf_input` (`id`, `name`) VALUES (1, 'Oceanus-1');
- INSERT INTO `udtf_input` (`id`, `name`) VALUES (2, 'Oceanus-2');
- INSERT INTO `udtf_input` (`id`, `name`) VALUES (3, 'Oceanus-3');
-
-
- -- 建表语句,用于接收 Sink 端数据
- CREATE TABLE `udtf_output2` (
- `id` int(10) NOT NULL,
- `name` varchar(20) DEFAULT '',
- `product` varchar(20) DEFAULT '',
- `num` varchar(20) DEFAULT '',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8
开发 UDTF
这里使用 TableFunction 自定义一个 UDTF。这个 UDTF 使用-
将传入进来的字段切分成两个字段后返回。
1. 代码编写
在本地IDE中创建 maven 项目,编写自定义函数 UDTF 的代码。
- // 类名:SplitRowUdtf
- package demos.UDTF;
-
-
- import org.apache.flink.table.annotation.DataTypeHint;
- import org.apache.flink.table.annotation.FunctionHint;
- import org.apache.flink.table.functions.TableFunction;
- import org.apache.flink.types.Row;
-
-
- @FunctionHint(output = @DataTypeHint("ROW<product STRING, num STRING>"))
- public class SplitRowUdtf extends TableFunction<Row> {
- public void eval(String a) {
- String[] split = a.split("-");
- String product = split[0];
- String num = split[1];
- collect(Row.of(product,num));
- }
- }
2. 项目打包
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:
mvn clean package
命令行打包后生成的 JAR 包可以在项目 target 目录下找到。
注意:与 Flink 相关的核心依赖包可以不打进 JAR 包,Oceanus 平台已提供,可将
scope
设置为provided
。具体可参考 Flink 实践教程:入门9-JAR 作业开发[5]。
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 JAR 包。
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。单击【作业参数】,在【引用程序包】处选择刚才上传的 JAR 包。
1. 创建 Function
CREATE TEMPORARY SYSTEM FUNCTION SplitRowUdtf AS 'demos.UDTF.SplitRowUdtf';
SplitRowUdtf
代表创建的函数名,demos.UDTF.SplitRowUdtf
代表代码所在路径。
2. 创建 Source
- CREATE TABLE `mysql_cdc_source_table` (
- `id` INT,
- `name` STRING,
- PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
- ) WITH (
- 'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
- 'hostname' = 'xx.xx.xx.xx', -- 数据库的 IP
- 'port' = 'xxxx', -- 数据库的访问端口
- 'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
- 'password' = 'xxxxxxxxx', -- 数据库访问的密码
- 'database-name' = 'testdb', -- 需要同步的数据库
- 'table-name' = 'udtf_input' -- 需要同步的数据表名
- );
- 3. 创建 Sink
- CREATE TABLE `jdbc_upsert_sink_table` (
- `id` INT,
- `name` VARCHAR,
- `product` VARCHAR,
- `num` VARCHAR,
- PRIMARY KEY(id) NOT ENFORCED
- ) WITH (
- -- 指定数据库连接参数
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数
- 'table-name' = 'udtf_output2', -- 需要写入的数据表
- 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)
- 'password' = 'xxxxxxxxx', -- 数据库访问的密码
- 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数
- 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔
- );
4. 编写业务 SQL
- -- cross join 写法
- INSERT INTO jdbc_upsert_sink_table
- SELECT
- S.id,S.name,T.product,T.num
- FROM mysql_cdc_source_table AS S,
- lateral table(SplitRowUdtf(name)) AS T(product,num);
-
-
- -- left join 写法
- INSERT INTO jdbc_upsert_sink_table
- SELECT
- S.id,S.name,T.product,T.num
- FROM mysql_cdc_source_table AS S
- left join lateral table(SplitRowUdtf(name)) AS T(product,num) on true;
UDTF 支持 cross join 和 left join,在使用 UDTF 时需要添加 lateral 和 table 关键字。使用 cross join 时,左表的每一行数据都会关联上 UDTF 产出的每一行数据,如果 UDTF 不产出任何数据,则这 1 行不会输出;使用 left join 时,左表的每一行数据都会关联上 UDTF 产出的每一行数据,如果 UDTF 不产出任何数据,则这 1 行的 UDTF 的字段会用 null 值填充。
本文首先在本地开发 UDTF 函数,将其打成 JAR 包后上传到 Oceanus 平台引用。接下来使用 MySQL CDC 连接器获取udtf_input
表数据,调用 UDTF 函数将name
字段切分成两个字段后存入 MySQL 中。UDTF 可以通过多次调用 collect() 实现将 1 行的数据转为多行返回。还可以将返回值声明成 Tuple 或 Row 类型即可实现 1 列转多列(如本文所示)。
自定义标量函数(UDF)只能将0个、1个或多个标量值映射到一个新的标量值。
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] MySQL 控制台:https://console.cloud.tencent.com/cdb
[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433
[5] Flink 实践教程:入门9-JAR 作业开发:https://cloud.tencent.com/developer/article/1907822
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。