当前位置:   article > 正文

【Flink1.14实战】Docker环境Flink Sql DataGen 快速开始_sql-cli-for-apache-flink-docker

sql-cli-for-apache-flink-docker
DataGen SQL 连接器

DataGen 连接器允许按数据生成规则进行读取。

DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。

DataGen 连接器是内置的。

注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。

创建一个 DataGen 的表

表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。

每个列,都有两种生成数据的方法:

  • 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。
  • 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。
CREATE TABLE datagen (
 f_sequence INT,
 f_random INT,
 f_random_str STRING,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',

 -- optional options --

 'rows-per-second'='5',

 'fields.f_sequence.kind'='sequence',
 'fields.f_sequence.start'='1',
 'fields.f_sequence.end'='1000',

 'fields.f_random.min'='1',
 'fields.f_random.max'='1000',

 'fields.f_random_str.length'='10'
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
连接器参数
参数是否必选默认值数据类型描述
connector必须(none)String指定要使用的连接器,这里是 ‘datagen’。
rows-per-second可选10000Long每秒生成的行数,用以控制数据发出速率。
fields.#.kind可选randomString指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。
fields.#.min可选(Minimum value of type)(Type of field)随机生成器的最小值,适用于数字类型。
fields.#.max可选(Maximum value of type)(Type of field)随机生成器的最大值,适用于数字类型。
fields.#.length可选100Integer随机生成器生成字符的长度,适用于 char、varchar、string。
fields.#.start可选(none)(Type of field)序列生成器的起始值。
fields.#.end可选(none)(Type of field)序列生成器的结束值。
实战

基于docker-compose

1、编辑 docker-compose.yml

version: "3"
services:
  jobmanager:
    image: flink:1.14.4-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: flink:1.14.4-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4   

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2、创建执行程序

package quick.table;


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TableExample {
    public static void main(String[] args) throws Exception {
        String sql="CREATE TABLE source_table (\n" +
                "    user_id INT,\n" +
                "    cost DOUBLE,\n" +
                "    ts AS localtimestamp,\n" +
                "    WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    'rows-per-second'='5',\n" +
                "\n" +
                "    'fields.user_id.kind'='random',\n" +
                "    'fields.user_id.min'='1',\n" +
                "    'fields.user_id.max'='10',\n" +
                "\n" +
                "    'fields.cost.kind'='random',\n" +
                "    'fields.cost.min'='1',\n" +
                "    'fields.cost.max'='100'\n" +
                ")\n";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql(sql);
        // 执行查询
        Table table = tableEnv.sqlQuery("select * from source_table");

        DataStream<Row> resultStream = tableEnv.toDataStream(table);

        // add a printing sink and execute in DataStream API
        resultStream.print();

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

3、启动服务

$ docker-compose up -d
  • 1

4、打印结果
在这里插入图片描述

然后,将打包应用程序提交,FlinkWeb UI来提交作业监控集群的状态和正在运行的作业。

$ docker-compose logs -f taskmanager
taskmanager_1  | +I[4, 60.06509260823151, 2022-04-13T11:01:30.349]
taskmanager_1  | +I[9, 22.444427031038334, 2022-04-13T11:01:30.349]
......
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述
源码已提交
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/391201?site
推荐阅读
相关标签
  

闽ICP备14008679号