当前位置:   article > 正文

Flink Sql光速入门_flinksql使用

flinksql使用

一、概括

实时开发现在主要是Flink,使用Flink的DataStreaming开发的门槛较高,需要对java和Flink的算子熟悉才能入手。对于绝大部分的大数据开发人员,肯定是对sql非常的熟悉,只要熟悉sql,那么就能很快学会flink sql。

flink sql相对hive sql和mysql来说,只是个别语法的不同,大部分还是相同的写法。只要看完本篇文章并且按照教程来练习,一天内你就能做实时报表,一周内你就能独立开发并运维实时任务。

进入该教程前,首先要对sql熟悉,要有kafka和MySQL,然后demo的环境需要有yarn和Flink包,下面是进入flink 客户端的教程。

1、进入flink包目录下

2、启动一个Flink Session
bin/yarn-session.sh -yn 3 -ys 3 -yjm 2048 -ytm 5120 -ynm flink_session_testn -d &
  • 1

3、启动flink客户端
 bin/sql-client.sh embedded -s yarn-session
  • 1

进入这个小松鼠界面表示就可以操作了

可以配置个查询展示的配置,展示更好看

SET 'sql-client.execution.result-mode' = 'tableau';


execution.runtime-mode:可视化结果模式
表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
	SET 'sql-client.execution.result-mode' = 'table';

变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流:
	SET 'sql-client.execution.result-mode' = 'changelog';
	
Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type)SET 'sql-client.execution.result-mode' = 'tableau';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

接下来让我们愉快的学习吧。

二、日期函数

1、获取当前本地时间
函数名返回值类型备注
LOCALTIMELOCALTIME返回本地时区的当前 SQL 时间,返回类型为 TIME(0)
LOCALTIMESTAMPTIMESTAMP_LTZ(3)返回本地时区的当前 SQL 时间,返回类型为 TIMESTAMP(3)
CURRENT_TIMELOCALTIME返回本地时区的当前 SQL 时间,这是 LOCAL_TIME 的同义词。
CURRENT_DATEDATE返回本地时区中的当前 SQL 日期
CURRENT_TIMESTAMPTIMESTAMP_LTZ(3)返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP_LTZ(3)
NOW()TIMESTAMP_LTZ(3)返回本地时区的当前 SQL 时间戳,这是 CURRENT_TIMESTAMP 的同义词
CURRENT_ROW_TIMESTAMP()TIMESTAMP_LTZ(3)返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP_LTZ(3)
UNIX_TIMESTAMP()int返回当前时间戳

flink 客户端下查询

select LOCALTIME,LOCALTIMESTAMP,CURRENT_TIME,CURRENT_DATE,CURRENT_TIMESTAMP,NOW(),CURRENT_ROW_TIMESTAMP();
  • 1

查询结果

pCWGOtH.png

2、日期转换函数
函数名称入参返回值类型
DATE_FORMAT(timestamp/string, string)第一个参数为时间,类型可以为时间类型,也可以为字符串类型,第二个参数为匹配字段string
TO_DATE(string1[, string2])参数只能为string类型,如果参数为时间类型,则要类型转换一下,如select to_date(cast(now() as string))date
TO_TIMESTAMP(string1)入参为字符串时间格式:TIMESTAMP
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)两个时间戳查,SECOND,MINUTE,HOUR,DAY,MONTH 或 YEAR。select TIMESTAMPDIFF(MINUTE, TIMESTAMP ‘2003-01-03 10:10:00’, TIMESTAMP ‘2003-01-03 10:00:00’); --结果为-10 后面-前面的 分钟差.int
UNIX_TIMESTAMP(string1[, string2])字符串时间返回为时间戳,第一个参数是时间,第二个是匹配时间的格式int
FROM_UNIXTIME(numeric[, string])把时间戳转换为时间,默认格式为yyyy-MM-dd HH:mm:ssstring
CONVERT_TZ(string1, string2, string3)时间时区转换函数string
TIMESTAMPADD(timeintervalunit, interval, timepoint)时间加减函数,第三个参数时间要为timestamp格式timestamp

查询

select 
DATE_FORMAT(now(), 'yyyy-MM-dd') as DATE_FORMAT,
TO_DATE('2023-01-01 11:11:12') as TO_DATE,
TO_TIMESTAMP('2023-01-01 11:11:12') as TO_TIMESTAMP,
TIMESTAMPDIFF(MINUTE, TIMESTAMP '2023-01-01 11:11:12', TIMESTAMP '2023-01-01 12:11:12') as TIMESTAMPDIFF,
UNIX_TIMESTAMP('2023-01-01 11:11:127','yyyy-MM-dd HH:mm:ss') as UNIX_TIMESTAMP,
FROM_UNIXTIME(1689061811,'yyyy-MM-dd HH:mm:ss') as FROM_UNIXTIME, 
CONVERT_TZ('2023-01-01 11:11:12','UTC','America/Los_Angeles') as CONVERT_TZ, 
TIMESTAMPADD(MINUTE, -5, cast(NOW() as timestamp)) TIMESTAMPADD
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

结果

pCWwCxH.png

3、时间获取函数
函数参数返回值
YEAR(date)date/timestampint 年
QUARTER(date)date/timestampint 季度(1 到 4 之间的整数)
MONTH(date)date/timestampint 月份(1 到 12 之间的整数)
WEEK(date)date/timestampint 周(1 到 53 之间的整数)
DAYOFYEAR(date)date/timestampint 当年的第几天1 到 366 之间的整数)
DAYOFMONTH(date)date/timestampint 当月的第几天(1 到 31 之间的整数)
DAYOFWEEK(date)date/timestampint 周几(1 到 7 之间的整数)
HOUR(timestamp)date/timestampint 小时(如何参数是date则为0) (0 到 23 之间的整数)
MINUTE(timestamp)date/timestampint 分钟(0 到 59 之间的整数)
SECOND(timestamp)date/timestampint 秒 (0 到 59 之间的整数)
FLOOR(timepoint TO timeintervalunit)Minutestring 秒数向下取整(例如 CEIL(TIME '12:44:31' TO MINUTE) 返回 12:44:00。)
CEIL(timespoint TO timeintervaluntit)Minute秒数向上取整(例如 CEIL(TIME '12:44:31' TO MINUTE) 返回 12:45:00。)

查询

select
YEAR(now()) as `YEAR`,
QUARTER(now()) as `QUARTER`,
MONTH(now()) as `MONTH`,
WEEK(now()) as `WEEK`,
DAYOFYEAR(now()) as `DAYOFYEAR`,
DAYOFMONTH(now()) as `DAYOFMONTH`,
DAYOFWEEK(now()) as `DAYOFWEEK`,
HOUR(now()) as `HOUR`,
MINUTE(now()) as `MINUTE`,
SECOND(now()) as `SECOND`
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

结果

pCfBRmj.png

三、聚合函数

函数说明
COUNT(*)返回输入行数。使用 DISTINCT 则对所有值去重后计算。
AVG([ ALL DISTINCT ] expression)DISTINCT ] expression)求平均值,使用 DISTINCT 则对所有值去重后计算。
SUM([ ALL DISTINCT ] expression)求和,使用 DISTINCT 则对所有值去重后计算。
MAX([ ALL DISTINCT ] expression)最大值,DISTINCT 则对所有值去重后计算
MIN([ ALL DISTINCT ] expression )最小值,DISTINCT 则对所有值去重后计算
STDDEV_POP([ ALL DISTINCT ] expression)总体标准偏差 DISTINCT 则对所有值去重后计算
STDDEV_SAMP([ ALL DISTINCT ] expression)个体标准偏差 DISTINCT 则对所有值去重后计算
VAR_POP([ ALL DISTINCT ] expression)总体方差DISTINCT 则对所有值去重后计算
VAR_SAMP([ ALL DISTINCT ] expression)样本方差 DISTINCT 则对所有值去重后计算
COLLECT([ ALL DISTINCT ] expression)返回跨所有输入行的多组表达式。NULL 值将被忽略,使用 DISTINCT 则对所有值去重后计算。
VARIANCE([ ALL DISTINCT ] expression)VAR_SAMP() 的同义方法。
RANK()非连续排名
DENSE_RANK()重复连续排名
ROW_NUMBER()连续非重复排名
LEAD(expression [, offset] [, default])下一个值
LAG(expression [, offset] [, default])前一个值
FIRST_VALUE(expression)第一个值
LAST_VALUE(expression)最后一个值
LISTAGG(expression [, separator])连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。相当于mysql中的group_concat,但是不能排序和去重
CUME_DIST()返回值在一组值的累积分布。结果是小于或等于当前行的值的行数除以窗口分区的总行数。
PERCENT_RANK()返回值在一组值的百分比排名。结果是当前行在窗口分区中的排名减 1,然后除以窗口分区的总行数减 1。如果窗口分区的总行数为 1,则该函数返回 0。
NTILE(n)将窗口分区中的所有数据按照顺序划分为 n 个分组,返回分配给各行数据的分组编号(从 1 开始)。 如果不能均匀划分为 n 个分组,则从第 1 个分组开始,为每一分组分配一个剩余值。 比如某个窗口分区有 6 行数据,划分为 4 个分组,则各行的分组编号为:1,1,2,2,3,4。

四、窗口函数

窗口函数有两个版本,普通版本和最新的Windowing table-valued functions (Windowing TVFs)版本,以下称为TVF版本

1、数据准备
1) 造数据

我使用的是java随机造数据写到kafka里,代码可以直接复制就可执行。

随便写一份相同的数据到mysql,已经生成了写数据的sql,直接复制执行即可,建表语句在下面,生成相同的数据是为了验证实时跑出来的数据是否正确。

kafka依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

造数据


public class Test {

    public static void main(String[] args) throws ParseException {
        String[] cityArr = {"成都","乐山","甘孜","绵阳","德阳","阿坝"};
        String[] modelArr = {"xiaomi 5","xiaomi 6","xiaomi 7","xiaomi 8","xiaomi 9","xiaomi 10","xiaomi 11","xiaomi 12","xiaomi 13" };
        String province = "四川";
        StringBuffer buffer = new StringBuffer("insert into burying_point(id, event_time_sp, load_long_time, event_sp, brand_sp, model_sp, os_sp, ip_sp, city_sp, province_sp, user_id_sp) values \n"); 
        // 开始时间
        long time = 1672502400000L; // 2023-01-01 00:00:00
        int size = 1000;
        for (int i = 0; i < size; i++) {
            burying_point bean = new burying_point();
            bean.setId((long)(Math.random() * 100000000 + 1000000000 ));
            bean.setEvent_time_sp(DateToolUtils.getFormatDateTime(new Date(time)));
            time = time + (long)(Math.random() * 30000 + 1000 ); // 下一个时间
            bean.setLoad_long_time((long)(Math.random() * 100 + 10));
            bean.setEvent_sp("CLICK");
            bean.setBrand_sp("小米");
            bean.setModel_sp(modelArr[ (int)(Math.random() * 1000000 + 10) %9]);
            bean.setOs_sp("安卓");
            bean.setIp_sp("localhost");
            bean.setCity_sp(cityArr[ (int)(Math.random() * 1000000 + 10) %6]);
            bean.setProvince_sp(province);
            bean.setUser_id_sp("10000000"+((long)(Math.random() * 1000000 + 10 ) % 100) +"");
            
            // 写数据到kafka
            sendDataToKafka(JSONObject.toJSONString(bean), "burying_point_kafka_topic");
            buffer.append("(")
                    .append(bean.getId()).append(",")
                    .append(getFiled(bean.getEvent_time_sp())).append(",")
                    .append(bean.getLoad_long_time()).append(",")
                    .append(getFiled(bean.getEvent_sp())).append(",")
                    .append(getFiled(bean.getBrand_sp())).append(",")
                    .append(getFiled(bean.getModel_sp())).append(",")
                    .append(getFiled(bean.getOs_sp())).append(",")
                    .append(getFiled(bean.getIp_sp())).append(",")
                    .append(getFiled(bean.getCity_sp())).append(",")
                    .append(getFiled(bean.getProvince_sp())).append(",")
                    .append(getFiled(bean.getUser_id_sp()))
                    .append(") ");
            if (i <  size -1){
                buffer.append(",\n");
            }
        }
        System.out.println(buffer.toString());

    }

    public static String getFiled(String filed){

        if (StringUtils.isEmpty(filed)){
            return "null";
        }else {
            return "'"+filed+"'";
        }
    }
    public static void sendDataToKafka(String data,String topic){
        Properties prop = new Properties();
        prop.put("bootstrap.servers","kafka:9092");
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("acks","all");
        prop.put("retries",0);
        prop.put("batch.size",16384);
        prop.put("linger.ms",1);
        prop.put("buffer.memory",33554432);
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),data));
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
2) 表结构

数据源

-- mysql table
CREATE TABLE burying_point(
    `id`             bigint(20)    NOT NULL COMMENT '主键id',
    `event_time_sp`  datetime      NOT NULL COMMENT '事件时间',
    `load_long_time` bigint(20)    NOT NULL COMMENT '加载时间(s)',
    `event_sp`       varchar(1000) NOT NULL COMMENT '事件名称',
    `brand_sp`       varchar(1000) DEFAULT NULL COMMENT '设备品牌',
    `model_sp`       varchar(1000) DEFAULT NULL COMMENT '设备型号',
    `os_sp`          varchar(1000) DEFAULT NULL COMMENT '操作系统',
    `ip_sp`          varchar(1000) DEFAULT NULL COMMENT 'IP',
    `city_sp`        varchar(1000) DEFAULT NULL COMMENT '城市',
    `province_sp`    varchar(1000) DEFAULT NULL COMMENT '省份',
    `user_id_sp`     varchar(1000) DEFAULT NULL COMMENT '用户ID',
    PRIMARY KEY (`id`)
) ENGINE = InnoDB    DEFAULT CHARSET = utf8  COLLATE = utf8_bin COMMENT ='埋点造数据';

 
-- flink table
CREATE TABLE IF NOT EXISTS burying_point(
    id             bigint COMMENT '主键id',
    event_time_sp  timestamp(3)  NOT NULL COMMENT '事件时间',
    load_long_time bigint        NOT NULL COMMENT '加载时间(s)',
    event_sp       varchar(1000) NOT NULL COMMENT '事件名称',
    brand_sp       varchar(1000) NULL COMMENT '设备品牌',
    model_sp       varchar(1000) NULL COMMENT '设备型号',
    os_sp          varchar(1000) NULL COMMENT '操作系统',
    ip_sp          varchar(1000) NULL COMMENT 'IP',
    city_sp        varchar(1000) NULL COMMENT '城市',
    province_sp    varchar(1000) NULL COMMENT '省份',
    user_id_sp     varchar(1000) NULL COMMENT '用户ID',
    process_time   as proctime(),
    WATERMARK FOR event_time_sp AS event_time_sp - INTERVAL '5' SECOND 
)WITH (
    'connector' = 'kafka'
    ,'topic' = 'burying_point_kafka_topic_name'
    ,'properties.bootstrap.servers' = 'localhost:9092'
    ,'properties.group.id' = 'test_group_id'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'format' = 'json'
    ,'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

结果表

-- mysql table
CREATE TABLE result_table(
    `dt`                 date       NOT NULL COMMENT '日期',
    `window_start`       datetime   NOT NULL COMMENT '开始时间',
    `window_time`         datetime   NOT NULL COMMENT '结束时间',
    `pv`                 bigint(20) NOT NULL COMMENT 'pv',
    `uv`                 bigint(20) NOT NULL COMMENT 'uv',
    `load_long_time_max` bigint(20) DEFAULT NULL COMMENT '加载最大时间',
    `load_long_time_min` bigint(20) DEFAULT NULL COMMENT '加载最小时间',
    `load_long_time_avg` bigint(20) DEFAULT NULL COMMENT '加载平均时间',
    `city_cnt`           bigint(20) DEFAULT NULL COMMENT '城市数量',
    `phone_cnt`          bigint(20) DEFAULT NULL COMMENT '手机数量',
    PRIMARY KEY (dt, window_start,window_time)
) ENGINE = InnoDB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT ='埋点造数据result';

-- flink table
CREATE TABLE result_table(
    `dt`                 date COMMENT '日期',
    `window_start`       timestamp(0) COMMENT '开始时间',
    `window_time`         timestamp(0) COMMENT '结束时间',
    `pv`                 bigint comment 'pv',
    `uv`                 bigint comment 'uv',
    `load_long_time_max` bigint comment '加载最大时间',
    `load_long_time_min` bigint comment '加载最小时间',
    `load_long_time_avg` bigint comment '加载平均时间',
    `city_cnt`           bigint comment '城市数量',
    `phone_cnt`          bigint comment '手机数量',
    PRIMARY KEY (dt, window_time) NOT ENFORCED
) WITH (
   'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://localhost:3306/bigdata'
    ,'table-name' = 'result_table'
    ,'username' = 'root'
    ,'password' = '123456'
);



CREATE TABLE result_table_hop(
    `dt`                 date       NOT NULL COMMENT '日期',
    `window_start`       datetime   NOT NULL COMMENT '开始时间',
    `window_time`         datetime   NOT NULL COMMENT '结束时间',
    `pv`                 bigint(20) NOT NULL COMMENT 'pv',
    `uv`                 bigint(20) NOT NULL COMMENT 'uv',
    `load_long_time_max` bigint(20) DEFAULT NULL COMMENT '加载最大时间',
    `load_long_time_min` bigint(20) DEFAULT NULL COMMENT '加载最小时间',
    `load_long_time_avg` bigint(20) DEFAULT NULL COMMENT '加载平均时间',
    `city_cnt`           bigint(20) DEFAULT NULL COMMENT '城市数量',
    `phone_cnt`          bigint(20) DEFAULT NULL COMMENT '手机数量',
    PRIMARY KEY (dt, window_start,window_time)
) ENGINE = InnoDB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT ='埋点造数据result-滑动窗口结果表';



CREATE TABLE result_table_hop(
    `dt`                 date COMMENT '日期',
    `window_start`       timestamp(0) COMMENT '开始时间',
    `window_time`         timestamp(0) COMMENT '结束时间',
    `pv`                 bigint comment 'pv',
    `uv`                 bigint comment 'uv',
    `load_long_time_max` bigint comment '加载最大时间',
    `load_long_time_min` bigint comment '加载最小时间',
    `load_long_time_avg` bigint comment '加载平均时间',
    `city_cnt`           bigint comment '城市数量',
    `phone_cnt`          bigint comment '手机数量',
    PRIMARY KEY (dt, window_time) NOT ENFORCED
) WITH (
   'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://localhost:3306/bigdata'
    ,'table-name' = 'result_table_hop'
    ,'username' = 'root'
    ,'password' = '123456'
);


CREATE TABLE result_table_cumulate(
    `dt`                 date       NOT NULL COMMENT '日期',
    `window_start`       datetime   NOT NULL COMMENT '开始时间',
    `window_time`         datetime   NOT NULL COMMENT '结束时间',
    `pv`                 bigint(20) NOT NULL COMMENT 'pv',
    `uv`                 bigint(20) NOT NULL COMMENT 'uv',
    `load_long_time_max` bigint(20) DEFAULT NULL COMMENT '加载最大时间',
    `load_long_time_min` bigint(20) DEFAULT NULL COMMENT '加载最小时间',
    `load_long_time_avg` bigint(20) DEFAULT NULL COMMENT '加载平均时间',
    `city_cnt`           bigint(20) DEFAULT NULL COMMENT '城市数量',
    `phone_cnt`          bigint(20) DEFAULT NULL COMMENT '手机数量',
    PRIMARY KEY (dt, window_start,window_time)
) ENGINE = InnoDB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT ='埋点造数据result-累计窗口结果表';



CREATE TABLE result_table_cumulate(
    `dt`                 date COMMENT '日期',
    `window_start`       timestamp(0) COMMENT '开始时间',
    `window_time`         timestamp(0) COMMENT '结束时间',
    `pv`                 bigint comment 'pv',
    `uv`                 bigint comment 'uv',
    `load_long_time_max` bigint comment '加载最大时间',
    `load_long_time_min` bigint comment '加载最小时间',
    `load_long_time_avg` bigint comment '加载平均时间',
    `city_cnt`           bigint comment '城市数量',
    `phone_cnt`          bigint comment '手机数量',
    PRIMARY KEY (dt, window_time) NOT ENFORCED
) WITH (
   'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://localhost:3306/bigdata'
    ,'table-name' = 'result_table_cumulate'
    ,'username' = 'root'
    ,'password' = '123456'
);


CREATE TABLE result_table_session(
    `dt`                 date       NOT NULL COMMENT '日期',
    `window_start`       datetime   NOT NULL COMMENT '开始时间',
    `window_time`         datetime   NOT NULL COMMENT '结束时间',
    `pv`                 bigint(20) NOT NULL COMMENT 'pv',
    `uv`                 bigint(20) NOT NULL COMMENT 'uv',
    `load_long_time_max` bigint(20) DEFAULT NULL COMMENT '加载最大时间',
    `load_long_time_min` bigint(20) DEFAULT NULL COMMENT '加载最小时间',
    `load_long_time_avg` bigint(20) DEFAULT NULL COMMENT '加载平均时间',
    `city_cnt`           bigint(20) DEFAULT NULL COMMENT '城市数量',
    `phone_cnt`          bigint(20) DEFAULT NULL COMMENT '手机数量',
    PRIMARY KEY (dt, window_start,window_time)
) ENGINE = InnoDB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT ='埋点造数据result-会话窗口结果表';



CREATE TABLE result_table_session(
    `dt`                 date COMMENT '日期',
    `window_start`       timestamp(0) COMMENT '开始时间',
    `window_time`         timestamp(0) COMMENT '结束时间',
    `pv`                 bigint comment 'pv',
    `uv`                 bigint comment 'uv',
    `load_long_time_max` bigint comment '加载最大时间',
    `load_long_time_min` bigint comment '加载最小时间',
    `load_long_time_avg` bigint comment '加载平均时间',
    `city_cnt`           bigint comment '城市数量',
    `phone_cnt`          bigint comment '手机数量',
    PRIMARY KEY (dt, window_time) NOT ENFORCED
) WITH (
   'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://localhost:3306/bigdata'
    ,'table-name' = 'result_table_session'
    ,'username' = 'root'
    ,'password' = '123456'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
2、滚动窗口(TUMBLE)

官网地址

[]: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/#tumble “滚动窗口”

滚动窗口要指定固定时长作为窗口大小,窗口不会重叠,窗口开始时间和结束时间是当前时间模上窗户时长,比如5分钟的窗口,窗口大小就是从[0,5),[5,10),[10,15),不在于程序启动的时间。

语法

在使用TVF开窗函数中,

flink自定义了三个原始字段,分别是:

window_start”, “window_end”, “window_time

其中window_start是窗口的开始时间

window_time是窗口结束时间

window_time与window_end关系为:window_time = window_end - 1ms

在使用普通开窗函数中

flink自定义了三个原始字段,分别是:

TUMBLE_STARTTUMBLE_ENDTUMBLE_ROWTIME

其中 TUMBLE_ROWTIME = TUMBLE_END - 1ms

普通开窗函数TVF开窗函数备注
window_startTUMBLE_START开窗开始时间
window_endTUMBLE_END下个开窗开始时间
window_timeTUMBLE_ROWTIME开窗结束时间

pCfoK2Q.png

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
  • 1
  • data: 表名
  • timecol: 指示数据的哪个时间属性列应映射到翻滚窗口
  • size: 指定翻滚窗口宽度的持续时间。

如何你的flink版本为13及以上,建议使用TVF窗口函数,低于13版本使用普通窗口函数,我使用的是17版本和13版本测试,其中国17版本已经不支持普通窗口函数,只能用TVF窗口函数。

1) 普通窗口函数

需求:计算每五分钟的指标数据

insert into result_table
select to_date(cast(event_time_sp as string))             as dt,
       tumble_start(event_time_sp, interval '5' minute)   as window_start,
       tumble_rowtime(event_time_sp, interval '5' minute) as tumble_time,
       count(*)                                           as pv,
       count(distinct user_id_sp)                         as uv,
       max(load_long_time)                                as load_long_time_max,
       min(load_long_time)                                as load_long_time_min,
       avg(load_long_time)                                as load_long_time_avg,
       count(distinct city_sp)                            as city_cnt,
       count(distinct model_sp)                           as phone_cnt
from burying_point
group by to_date(cast(event_time_sp as string)), tumble(event_time_sp, interval '5' minute);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
2) TVF窗口函数
insert into result_table
select to_date(cast(event_time_sp as string)) as dt,
       window_start,
       window_time,
       count(*)                            as pv,
       count(distinct user_id_sp)          as uv,
       max(load_long_time)                 as load_long_time_max,
       min(load_long_time)                 as load_long_time_min,
       avg(load_long_time)                 as load_long_time_avg,
       count(distinct city_sp)             as city_cnt,
       count(distinct model_sp)            as phone_cnt
from TABLE(TUMBLE(
         TABLE burying_point,
         DESCRIPTOR(event_time_sp),
         INTERVAL '5' minute))
group by to_date(cast(event_time_sp as string)),window_start,window_time
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
3) 数据验证
select rt.dt,
       rt.window_start,
       rt.window_time,
       max(rt.pv)                 as pv,
       count(*)                   as pb_pv,
       max(rt.uv)                 as uv,
       count(distinct user_id_sp) as pb_uv,
       max(rt.load_long_time_max) as load_long_time_max,
       max(load_long_time)        as pb_load_long_time_max,
       max(rt.load_long_time_min) as load_long_time_min,
       min(load_long_time)        as pb_load_long_time_min,
       max(rt.load_long_time_avg) as load_long_time_avg,
       avg(load_long_time)        as pb_load_long_time_avg,
       max(rt.city_cnt)           as city_cnt,
       count(distinct city_sp)    as pb_city_cnt,
       max(rt.phone_cnt)          as phone_cnt,
       count(distinct model_sp)   as pb_phone_cnt
from burying_point bp
inner join result_table rt on rt.dt = date(bp.event_time_sp) and bp.event_time_sp between rt.window_start and rt.window_time
group by rt.dt, rt.window_start, rt.window_time;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

其中平均值不太一样,因为保留了小数,如果直接平均值结果取整结果完全一样,简直完美。

3、滑动窗口(HOP)

官网地址

[]: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/#hop “滑动窗口”

滑动窗口有两个参数,一个是窗口大小,另一个是滑动步长,如果步长等于窗口,那么就等于滚动窗口。

需求:每过一分钟计算五分钟内的指标,即窗口长度为5分钟,步长为1分钟。

1) 普通窗口函数
insert into result_table_hop
select to_date(cast(event_time_sp as string))             as dt,
       hop_start(event_time_sp,interval '1' minute, interval '5' minute)   as window_start,
       hop_rowtime(event_time_sp,interval '1' minute, interval '5' minute)   as window_end,
       count(*)                                           as pv,
       count(distinct user_id_sp)                         as uv,
       max(load_long_time)                                as load_long_time_max,
       min(load_long_time)                                as load_long_time_min,
       avg(load_long_time)                                as load_long_time_avg,
       count(distinct city_sp)                            as city_cnt,
       count(distinct model_sp)                           as phone_cnt
from burying_point
group by to_date(cast(event_time_sp as string)), hop(event_time_sp,interval '1' minute, interval '5' minute);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
2) TVF窗口函数
insert into result_table_hop
select to_date(cast(event_time_sp as string)) as dt,
       window_start,
       window_time,
       count(*)                            as pv,
       count(distinct user_id_sp)          as uv,
       max(load_long_time)                 as load_long_time_max,
       min(load_long_time)                 as load_long_time_min,
       avg(load_long_time)                 as load_long_time_avg,
       count(distinct city_sp)             as city_cnt,
       count(distinct model_sp)            as phone_cnt
from TABLE(HOP(
         TABLE burying_point,
         DESCRIPTOR(event_time_sp),
         INTERVAL '1' minute,INTERVAL '5' minute))
group by to_date(cast(event_time_sp as string)),window_start,window_time
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
3) 数据验证

select rt.dt,
       rt.window_start,
       rt.window_time,
       max(rt.pv)                 as pv,
       count(*)                   as pb_pv,
       max(rt.uv)                 as uv,
       count(distinct user_id_sp) as pb_uv,
       max(rt.load_long_time_max) as load_long_time_max,
       max(load_long_time)        as pb_load_long_time_max,
       max(rt.load_long_time_min) as load_long_time_min,
       min(load_long_time)        as pb_load_long_time_min,
       max(rt.load_long_time_avg) as load_long_time_avg,
       avg(load_long_time)        as pb_load_long_time_avg,
       max(rt.city_cnt)           as city_cnt,
       count(distinct city_sp)    as pb_city_cnt,
       max(rt.phone_cnt)          as phone_cnt,
       count(distinct model_sp)   as pb_phone_cnt
from burying_point bp
inner join result_table_hop rt on rt.dt = date(bp.event_time_sp) and bp.event_time_sp between rt.window_time and rt.window_time
group by rt.dt,rt.window_start,rt.window_time
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

数据完成一致

4、累积窗口(CUMULATE)

官网地址

[]: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/#cumulate “累积窗口”

CUMULATE累计窗口是滚动窗口的一个升级版,先确定一个滚动窗口,然后在滚动窗口内部再进行汇总,每次汇总给都是从当前时间当窗口时间的累积汇总。

1) 普通窗口函数

由于累计窗口是比较高级的功能,低版本并不支持,只支持TVF

2) TVF窗口函数
insert into result_table_cumulate
select to_date(cast(event_time_sp as string)) as dt,
       window_start,
       window_time,
       count(*)                            as pv,
       count(distinct user_id_sp)          as uv,
       max(load_long_time)                 as load_long_time_max,
       min(load_long_time)                 as load_long_time_min,
       avg(load_long_time)                 as load_long_time_avg,
       count(distinct city_sp)             as city_cnt,
       count(distinct model_sp)            as phone_cnt
from TABLE(CUMULATE(
         TABLE burying_point,
         DESCRIPTOR(event_time_sp),
         INTERVAL '1' minute,INTERVAL '5' minute))
group by to_date(cast(event_time_sp as string)),window_start,window_time
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
3) 数据验证

我随机筛选了几条是对的上的,可以自行验证

5、会话窗口(Session)

官网地址:官网上没有找到,提示这个:Session Windows (will be supported soon)

Session窗口没有固定的时间 只有最小的间隔时间没有数据就会关闭之前那个窗口

1) 普通窗口函数
insert into result_table_session
select to_date(cast(event_time_sp as string))             as dt,
       session_start(event_time_sp,interval '30' second)   as window_start,
       session_rowtime(event_time_sp,interval '30' second)   as window_end,
       count(*)                                           as pv,
       count(distinct user_id_sp)                         as uv,
       max(load_long_time)                                as load_long_time_max,
       min(load_long_time)                                as load_long_time_min,
       avg(load_long_time)                                as load_long_time_avg,
       count(distinct city_sp)                            as city_cnt,
       count(distinct model_sp)                           as phone_cnt
from burying_point
group by to_date(cast(event_time_sp as string)), session(event_time_sp,interval '30' second);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
2) TVF窗口函数

看了下官网好像暂不支持TVF的Session窗口

3) 数据验证

我随机筛选了几条是对的上的,可以自行验证

自定义函数

五、join

1、概况

官网地址

[https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/joins/]:

flink sql的join主要分为interval join,regular join,lookup join,这三个用的是最多的,temporal join和普通的外部表join用的比较少。

常用的就interval join稍微麻烦点,其他的和普通sql差异不大。

2、interval join

interval join是两个表在某个时间交集进行关联,如果时间区间没有重合就关联不上,时间过了就会清空状态后端你的数据

3、regular join

和普通sql一样关联,数据一直放在状态后端里,每次数据来了后就会和历史数据进行关联计算,有个设置状态后端数据保存时长,如果没有配置,数据则会永久保存,一般按照也去需求进行配置,防止数据无限增长。

语法

SELECT
    to_date(cast(row_time as string)) as dt,count(*) as cnt
FROM tablea a 
INNER JOIN tableb b ON a.aid = b.aid
AND a.row_time BETWEEN b.row_time - INTERVAL '1' minute AND click_log_table.row_time
group by to_date(cast(row_time as string))
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
4、lookup join

官网:

[https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/jdbc/#lookup-cache]:

lookup join主要是关联第三方数据库的维表,主要是三个配置

lookup join有个性能瓶颈,每次来什么数据查什么数据,如果维表数据量大,这个是很大的性能瓶颈,可以自定义lookup join,修改为批量查询,加个初始化维表的逻辑,这样也可以减少多次查询,减少网络IO。

参数可选默认值类型描述
lookup.cache可选NONE枚举类型可选值: NONE, PARTIAL维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。
lookup.cache.max-rows可选(none)Integer维表缓存的最大行数,若超过该值,则最老的行记录将会过期
lookup.partial-cache.expire-after-write可选(none)Duration在记录写入缓存后该记录的最大保留时间。
lookup.partial-cache.expire-after-access可选(none)Duration在缓存中的记录被访问后该记录的最大保留时间
lookup.partial-cache.cache-missing-key可选trueBoolean是否缓存维表中不存在的键

其中缓存最大条数在flink1.13中是一下三个参数

参数l类型描述
lookup.cache.max-rowsInteger请配置 “lookup.cache” = “PARTIAL” 并使用 “lookup.partial-cache.max-rows” 代替
lookup.cache.ttlDuration请配置 “lookup.cache” = “PARTIAL” 并使用 “lookup.partial-cache.expire-after-write” 代替
lookup.cache.caching-missing-keyBoolean请配置 “lookup.cache” = “PARTIAL” 并使用 “lookup.partial-cache.cache-missing-key” 代替

查询语法

select o.id,o.order_no,o.uid,u.uname
from order_table o
left join dim_user_info FOR SYSTEM_TIME AS OF o.process_time as u on o.uid = u.id
 
  • 1
  • 2
  • 3
  • 4

注意语法 FOR SYSTEM_TIME AS OF o.process_time表示lookup join

六、sql优化

1、概况

官网地址

[https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/tuning/]:

2、MiniBatch 聚合

minibatch原理是对数据进行攒批,减少对状态后端的访问次数,减少磁盘IO,从而提高吞吐量。

开启该功能需要进行配置

// instantiate table environment
TableEnvironment tEnv = ...;

// access flink configuration
TableConfig configuration = tEnv.getConfig();
// set low-level key-value options
configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
3、 Local-Global 聚合

Local-Global优化是姜之前的Agg拆分成两个阶段Local+Agg,数据流中的记录可能会倾斜,本地聚合可以将一定数量具有相同 key 的输入数据累加到单个累加器中,全局聚合将仅接收 reduce 后的累加器,而不是大量的原始输入数据。这可以大大减少网络 shuffle 和状态访问的成本。每次本地聚合累积的输入数据量基于 mini-batch 间隔。这意味着 local-global 聚合依赖于启用了 mini-batch 优化。

开启该功能需要进行配置,要先开启MiniBatch,故这两者是互相配合使用。

// instantiate table environment
TableEnvironment tEnv = ...;

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
4、 Split Distinct

Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。如果 distinct key (即 user_id)的值分布稀疏,则 COUNT DISTINCT 不适合减少数据。即使启用了 local-global 优化也没有太大帮助。因为累加器仍然包含几乎所有原始记录,并且全局聚合将成为瓶颈(大多数繁重的累加器由一个任务处理,即同一天)。

下图显示了拆分 distinct 聚合如何提高性能

下面的sql会自动优化

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
  • 1
  • 2
  • 3

如果不开启该功能,也可以自己手动打散去重

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

开启

// instantiate table environment
TableEnvironment tEnv = ...;

tEnv.getConfig()
  .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
  • 1
  • 2
  • 3
  • 4
  • 5
5、 FILTER 修饰符

聚合的时候同一个字段作为条件才能使用,这个一般用不上,了解下就可以

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

改为

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
七、程序启动
1、启动命令
bin/flink run \
-t yarn-per-job \
-d \
-p 3 \
-Dtaskmanager.numberOfTaskSlots=3 \
-Dyarn.application.queue=test \ 指定 yarn 队列
-Dyarn.application.name=flinkName \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.memory.managed.fraction=0.2 \
-c com.test.appNameClass \
flink.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

命令解释

-t yarn-per-job: yarn模式
-p : 程序运行的并行度
-Dtaskmanager.numberOfTaskSlots: TaskManager的slot数
-Dyarn.application.queue: yarn队列名称
-Dyarn.application.name: flink程序名称
-Djobmanager.memory.process.size:JobManage内存
-Dtaskmanager.memory.process.size=4096mb: TaskManamger内存
-c: 应用程序入口全类名
-Dtaskmanager.memory.managed.fraction: 管理内存比例

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

END) AS web_uv
FROM T
GROUP BY day




改为

```sql
SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
七、程序启动
1、启动命令
bin/flink run \
-t yarn-per-job \
-d \
-p 3 \
-Dtaskmanager.numberOfTaskSlots=3 \
-Dyarn.application.queue=test \ 指定 yarn 队列
-Dyarn.application.name=flinkName \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.memory.managed.fraction=0.2 \
-c com.test.appNameClass \
flink.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

命令解释

-t yarn-per-job: yarn模式
-p : 程序运行的并行度
-Dtaskmanager.numberOfTaskSlots: TaskManager的slot数
-Dyarn.application.queue: yarn队列名称
-Dyarn.application.name: flink程序名称
-Djobmanager.memory.process.size:JobManage内存
-Dtaskmanager.memory.process.size=4096mb: TaskManamger内存
-c: 应用程序入口全类名
-Dtaskmanager.memory.managed.fraction: 管理内存比例

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/396747
推荐阅读
相关标签
  

闽ICP备14008679号