赞
踩
FOR SYSTEM_TIME AS OF
lookup.cache.max-rows
和lookup.cache.ttl
参数来启用SQL参数 | 说明 |
---|---|
connector | 连接器,可以是jdbc 、kafka 、filesystem … |
driver | 数据库驱动 |
lookup.cache.ttl | Lookup Cache中每行数据 的 最大 存活时间 |
lookup.cache.max-rows | Lookup Cache中的最大行数 |
当 缓存的行数>
lookup.cache.max-rows
时,将清除存活时间最久的记录
缓存中的行 的存货时间 超过lookup.cache.ttl
也会被清除
环境:WIN10+IDEA+JDK1.8+Flink1.13+MySQL8
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.13.6</flink.version> <scala.binary.version>2.12</scala.binary.version> <slf4j.version>2.0.3</slf4j.version> <log4j.version>2.17.2</log4j.version> <fastjson.version>2.0.19</fastjson.version> <lombok.version>1.18.24</lombok.version> <mysql.version>8.0.31</mysql.version> </properties> <!-- https://mvnrepository.com/ --> <dependencies> <!-- Flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- FlinkSQL --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 'format'='csv' --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <!-- 'format'='json' --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!-- 'connector' = 'jdbc' --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 'driver' = 'com.mysql.cj.jdbc.Driver' --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- 日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>${log4j.version}</version> </dependency> <!-- JSON解析 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- 简化JavaBean书写 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> </dependencies>
DROP DATABASE IF EXISTS db0;
CREATE DATABASE db0;
CREATE TABLE db0.tb0 (
a VARCHAR(255) PRIMARY KEY,
b INT(3),
c BIGINT(5),
d FLOAT(3,2),
e DOUBLE(4,2),
f DATE DEFAULT '2022-10-24',
g TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT db0.tb0 (a,b,c,d,e) VALUES
('aa',1,11,1.11,11.11),
('bb',2,22,2.22,22.22),
('cc',3,33,3.33,33.33);
SELECT * FROM db0.tb0;
SQL
CREATE TEMPORARY TABLE temp_tb0 ( a STRING, b INT, c BIGINT, d FLOAT, e DOUBLE, f DATE, g TIMESTAMP, PRIMARY KEY(a) NOT ENFORCED) WITH ( 'lookup.cache.max-rows' = '2', 'lookup.cache.ttl' = '30 second', 'connector' = 'jdbc', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://localhost:3306/db0', 'username' = 'root', 'password' = '123456', 'table-name' = 'tb0' )
测试代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Hello { public static void main(String[] args) { //创建流和表的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env); //创建表,连接MySQL表 tbEnv.executeSql("CREATE TEMPORARY TABLE temp_tb0 (\n" + " a STRING,\n" + " b INT,\n" + " c BIGINT,\n" + " d FLOAT,\n" + " e DOUBLE,\n" + " f DATE,\n" + " g TIMESTAMP,\n" + " PRIMARY KEY(a) NOT ENFORCED)\n" + "WITH (\n" + " 'lookup.cache.max-rows' = '2',\n" + " 'lookup.cache.ttl' = '30 second',\n" + " 'connector' = 'jdbc',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'url' = 'jdbc:mysql://localhost:3306/db0',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'table-name' = 'tb0'\n" + ")"); //执行查询,打印 tbEnv.sqlQuery("SELECT * FROM temp_tb0").execute().print(); } }
测试结果打印
+----+----+---+----+------+-------+------------+----------------------------+
| op | a | b | c | d | e | f | g |
+----+----+---+----+------+-------+------------+----------------------------+
| +I | aa | 1 | 11 | 1.11 | 11.11 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | bb | 2 | 22 | 2.22 | 22.22 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
| +I | cc | 3 | 33 | 3.33 | 33.33 | 2022-10-24 | 2022-11-29 14:57:50.000000 |
+----+----+---+----+------+-------+------------+----------------------------+
SELECT * FROM v
JOIN t
FOR SYSTEM_TIME AS OF v.y
ON v.x=t.a
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Scanner; import static org.apache.flink.table.api.Expressions.$; public class Hi { public static void main(String[] args) { //创建流和表的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env); //创建左表 DataStreamSource<String> d = env.addSource(new ManualSource()); Table tb = tbEnv.fromDataStream(d, $("x"), $("y").proctime()); tbEnv.createTemporaryView("v", tb); //创建右表(维度表) tbEnv.executeSql("CREATE TEMPORARY TABLE t ( " + " a STRING, " + " b INT, " + " c BIGINT, " + " d FLOAT, " + " e DOUBLE, " + " f DATE, " + " g TIMESTAMP, " + " PRIMARY KEY(a) NOT ENFORCED) " + "WITH ( " + " 'lookup.cache.max-rows' = '2', " + " 'lookup.cache.ttl' = '30 second', " + " 'connector' = 'jdbc', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' = 'jdbc:mysql://localhost:3306/db0', " + " 'username' = 'root', " + " 'password' = '123456', " + " 'table-name' = 'tb0' " + ")"); //执行查询,打印 tbEnv.sqlQuery("SELECT * FROM v " + "JOIN t " + " FOR SYSTEM_TIME AS OF v.y " + " ON v.x=t.a").execute().print(); } /** 手动输入的数据源 */ public static class ManualSource implements SourceFunction<String> { public ManualSource() {} @Override public void run(SourceFunction.SourceContext<String> sc) { Scanner scanner = new Scanner(System.in); while (true) { String str = scanner.nextLine().trim(); if (str.equals("STOP")) {break;} if (!str.equals("")) {sc.collect(str);} } scanner.close(); } @Override public void cancel() {} } }
测试结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。