赞
踩
需要主要的是:
//默认值为0 表示FlinkSQL中的状态永久保存
System.out.println(tableEnv.getConfig().getIdleStateRetention());
//执行FLinkSQL状态保留10秒 输出的值保持时间,以最终的输入时间算起
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
代码
package com.wudl.flink.sql; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; /** * @ClassName : Flink_SQL_join * @Description : Flink -sql 动态表关联 * @Author :wudl * @Date: 2021-08-20 00:16 */ public class Flink_SQL_join { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //默认值为0 表示FlinkSQL中的状态永久保存 System.out.println(tableEnv.getConfig().getIdleStateRetention()); //执行FLinkSQL状态保留10秒 tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10)); //2.读取端口数据创建流 SingleOutputStreamOperator<TableA> aDS = env.socketTextStream("192.168.1.180", 8888) .map(line -> { String[] split = line.split(","); return new TableA(split[0], split[1]); }); SingleOutputStreamOperator<TableB> bDS = env.socketTextStream("192.168.1.180", 9999) .map(line -> { String[] split = line.split(","); return new TableB(split[0], Integer.parseInt(split[1])); }); // 3. 将流转化为动态表 tableEnv.createTemporaryView("tabA", aDS); tableEnv.createTemporaryView("tabB", bDS); // 4. 双流join tableEnv.sqlQuery("select * from tabA a left join tabB b on a.id = b.id").execute().print(); env.execute(); } }
package com.wudl.flink.sql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @ClassName : TableA * @Description : * @Author :wudl * @Date: 2021-08-20 00:18 */ @Data @NoArgsConstructor @AllArgsConstructor public class TableA { private String id; private String name; }
package com.wudl.flink.sql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @ClassName : TableB * @Description : * @Author :wudl * @Date: 2021-08-20 00:18 */ @Data @NoArgsConstructor @AllArgsConstructor public class TableB { private String id; private int classId; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。