赞
踩
DataGen 连接器允许按数据生成规则进行读取。
DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。
DataGen 连接器是内置的。
注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。
表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。
每个列,都有两种生成数据的方法:
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
)
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必须 | (none) | String | 指定要使用的连接器,这里是 ‘datagen’。 |
rows-per-second | 可选 | 10000 | Long | 每秒生成的行数,用以控制数据发出速率。 |
fields.#.kind | 可选 | random | String | 指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。 |
fields.#.min | 可选 | (Minimum value of type) | (Type of field) | 随机生成器的最小值,适用于数字类型。 |
fields.#.max | 可选 | (Maximum value of type) | (Type of field) | 随机生成器的最大值,适用于数字类型。 |
fields.#.length | 可选 | 100 | Integer | 随机生成器生成字符的长度,适用于 char、varchar、string。 |
fields.#.start | 可选 | (none) | (Type of field) | 序列生成器的起始值。 |
fields.#.end | 可选 | (none) | (Type of field) | 序列生成器的结束值。 |
1、编辑 docker-compose.yml
version: "3"
services:
jobmanager:
image: flink:1.14.4-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.14.4-scala_2.11
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
2、创建执行程序
package quick.table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class TableExample {
public static void main(String[] args) throws Exception {
String sql="CREATE TABLE source_table (\n" +
" user_id INT,\n" +
" cost DOUBLE,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.user_id.kind'='random',\n" +
" 'fields.user_id.min'='1',\n" +
" 'fields.user_id.max'='10',\n" +
"\n" +
" 'fields.cost.kind'='random',\n" +
" 'fields.cost.min'='1',\n" +
" 'fields.cost.max'='100'\n" +
")\n";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(sql);
// 执行查询
Table table = tableEnv.sqlQuery("select * from source_table");
DataStream<Row> resultStream = tableEnv.toDataStream(table);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
}
}
3、启动服务
$ docker-compose up -d
4、打印结果
然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
$ docker-compose logs -f taskmanager
taskmanager_1 | +I[4, 60.06509260823151, 2022-04-13T11:01:30.349]
taskmanager_1 | +I[9, 22.444427031038334, 2022-04-13T11:01:30.349]
......
源码已提交
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。