赞
踩
sql 的诞生就是为了简化我们对数据开发,可以使用少量的 sql 代码,帮助我完成对数据的查询,分析等功能
对于用户只需要表达我想要什么,具体处理逻辑交给框架,系统处理,用户无需关心,对于一些非专业的开发人员有了解 sql,并且 sql 相对我们学习 java,c 等语言更简单,大数据培训学习成本更低,如果跨团队,或者非大数据开发人员,也可以通过 sql 来进行 flink 任务的开发
查询优化器,会对我们编写的 sql 进行优化,生成效率更好的执行计划,所以用户不需要了解底层细节,即高效的获取结果
sql 语义发展几十年是一个很稳定的语言,少有变动,当我们引擎的升级,甚至替换成另一个引擎,都可以做到兼容地,平滑地升级,无需更改我们的已经编写好的 sql 代码
对于 flink 通过 sql 的表达式,来完成流批的统一,一套 sql 代码,既可以跑流任务,也可以跑批任务,减少我们开发的成本
- -- 字符串类型
-
- # char类型
- CHAR
- CHAR(n) -- n在 1 和 2147483647 之间 未设置n=1
- # 字符串类型
- VARCHAR
- VARCHAR(n) -- n在 1 和 2147483647 之间 未设置n=1
- STRING -- 等于最大的varchar(max)
- # 二进制类型
- BINARY
- BINARY(n) -- 范围同上
- # 可变长度二进制类型
- VARBINARY
- VARBINARY(n) -- 类似于string
- BYTES
-
- -- 数字类型
-
- # 带有精度的十进制数字类型 -- 类似于java中的
- DECIMAL
- DECIMAL(p)
- DECIMAL(p, s)
-
- DEC
- DEC(p)
- DEC(p, s)
-
- NUMERIC
- NUMERIC(p)
- NUMERIC(p, s)
- # 带符号
- TINYINT -- -128 to 127
- SMALLINT -- -32768 to 32767
- # 不带符号的
- INT -- 2147483,648 to 2147483647
- INTEGER
- BIGINT -- -9223372036854775808 to 9223372036854775807
- # 带小数的
- FLOAT
- DOUBLE
-
- -- 时间类型
-
- #日期
- DATE -- 2020-10-12
- #时间
- TIME
- TIME(p) -- 10:10:12.p 不指定p,p= 0
- #时间戳
- TIMESTAMP
- TIMESTAMP(p) -- 2020-12-12 12:10:11.p
-
- -- 其他类型
- #
- ARRAY<t>
- t ARRAY
- #map类型
- MAP<kt, vt>
-
-
- -- 对应java的类型
-
- Class Type
- java.lang.String STRING
- java.lang.Boolean BOOLEAN
- boolean BOOLEAN NOT NULL
- java.lang.Byte TINYINT
- byte TINYINT NOT NULL
- java.lang.Short SMALLINT
- short SMALLINT NOT NULL
- java.lang.Integer INT
- int INT NOT NULL
- java.lang.Long BIGINT
- long BIGINT NOT NULL
- java.lang.Float FLOAT
- float FLOAT NOT NULL
- java.lang.Double DOUBLE
- double DOUBLE NOT NULL
- java.sql.Date DATE
- java.time.LocalDate DATE
- java.sql.Time TIME(0)
- java.time.LocalTime TIME(9)
- java.sql.Timestamp TIMESTAMP(9)
- java.time.LocalDateTime TIMESTAMP(9)
- java.time.OffsetDateTime TIMESTAMP(9) WITH TIME ZONE
- java.time.Instant TIMESTAMP(9) WITH LOCAL TIME ZONE
- java.time.Duration INVERVAL SECOND(9)
- java.time.Period I NTERVAL YEAR(4) TO MONTH
- byte[] BYTES
- T[] ARRAY<T>
- java.util.Map<K, V> MAP<K, V>
-

- /*
- 下面是1.12版本的系统内置的函数,具体我们可以到官网查看,根据需求使用即可
- https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
- */
-
-
-
- // TODO 主要介绍自定义函数
- /*
- udf 和 udaf 需要定义eval方法,实现自己的逻辑,具体系统会调用对应的方法
-
- udf : 传入一个值/多个/或者不传入,返回一个新的值,可以重载该方法,具体会根据传入的参数调用对应eval烦恼歌发 类似`map`算子,作用于sql
- udaf : 自定义聚合函数,根据自己的逻辑定义累加器
- udtf : 用作与表中,可返回一个或多个值,
- */
-
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.typeutils.RowTypeInfo;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.java.StreamTableEnvironment;
- import org.apache.flink.table.functions.AggregateFunction;
- import org.apache.flink.table.functions.ScalarFunction;
- import org.apache.flink.table.functions.TableFunction;
- import org.apache.flink.types.Row;
-
- import java.sql.SQLException;
-
- public class UDFDemo {
- public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
- EnvironmentSettings.newInstance().build());
-
- // 注册函数
- tEnv.registerFunction("customFunc1", new CustomUDF());
- tEnv.registerFunction("customFunc2", new CustomUDAF());
- tEnv.registerFunction("customFunc3", new CustomUDTF());
- }
-
- static class Acc {
- int result;
- public Integer gerResult() {
- return result;
- }
- public Acc merge(Acc acc) {
- result = acc.gerResult() + result;
- return this;
- }
- public void incr() {
- result++;
- }
- }
-
- static class CustomUDF extends ScalarFunction {
- // UDF 需要定义该方法
- public int eval(String str) {
- int hc = 0;
- for (char c : str.toUpperCase().toCharArray()) {
- hc = hashCode() >> c;
- }
- hc = hc - 1 - str.length();
- hc = hc >> 7;
- return hc;
- }
- }
-
- static class CustomUDTF extends TableFunction<Row> {
- // udtf 需要定义该方法,在该方法实现逻辑
- public void eval(String str) throws SQLException {
- if (str != null) {
- for (String s : str.split(",")) {
- Row row = new Row(2);
- row.setField(0, s);
- row.setField(1, 1);
- collect(row);
- }
- }
- }
-
- @Override
- public TypeInformation<Row> getResultType() {
- return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
- }
- }
-
- static class CustomUDAF extends AggregateFunction<Integer, Acc> {
- @Override
- public Integer getValue(Acc accumulator) {
- return accumulator.gerResult();
- }
- @Override
- public Acc createAccumulator() {
- return new Acc();
- }
- // 累加
- public void accumulate(Acc acc,String input){
- if("*".equals(input)){
- return;
- }
- acc.incr();
- }
- public void accumulate(Acc acc){
- acc.incr();
- }
- }
- }
-

❝
flink sql 中时间机制本质与 dataStream api 相同,只不过使用少于区别,稍加注意即可,注意指定 watermark 需要使用 sql 中 timestamp(3)类型(具体对应 java 类型可根据上面类型自行判断),设置 watermark 后可使用 ROWTIEM 字段(具体看 sql 代码),没有设置可直接使用 PROCTIME 字段
注意 : 不同的时间语义要严格对应环境配置的时间语义,否则可能出现异常
❝
时间字段为两种,属于非用户指定字段,设置完时间语义后,根据需求使用具体的时间字段
❞
ROWTIME : 事件时间
PROCTIME : 处理时间字段
场景 :
join : 场景与双流 join 或者 维表 join,目前 flink 支持的不是很好
topN & 去重 : 语法基本相同,row_num > 1 即 topN , 当=1 则是去重操作
topN 场景一些热搜,排名等内容
去重顾名思义,就是为了去重,去重会涉及到 retract 流(以后会详细讲)内容,会更新之前已经存在的结果
❞
// TODO 下面代码仅供参考,具体测试根据自己时间环境来
// 以下只是一些简单的案例,后面会逐步深入复杂sql和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author 857hub
*/
public class ClickhouseSinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().
// useBlinkPlanner().
build()
);
tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
// sources
String source = "CREATE TABLE source (\n" +
" `id` int,\n" +
" `name` varchar.\n" +
" `ts` timestamp(3),\n" +
// 指定watermark 允许延迟5s
"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"+
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test1',\n" +
" 'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
" 'properties.group.id' = 'xzw',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
String source2 = "CREATE TABLE source2 (\n" +
" `id` int,\n" +
" `name` varchar,\n" +
" `ts` timestamp(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test2',\n" +
" 'properties.bootstrap.servers' = '172.16.100.109:9092',\n" +
" 'properties.group.id' = 'xzw',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
// clickhouse sink 由我自己定义,后面会对sql自定义source和sink进行讲解
String sink = "CREATE TABLE sink (\n" +
" `id` INT,\n" +
" `name` VARCHAR\n" +
") WITH (\n" +
// 需要自定义接信息参数 -- option
" 'connector' = 'xzw_ck',\n" +
" 'url' = 'jdbc:clickhouse://localhost:8123/default',\n" +
" 'table-name' = 'test',\n" +
" 'username' = 'default',\n" +
" 'password' = '123456'\n" +
" )";
// 执行 source sink sql
tEnv.executeSql(source);
tEnv.executeSql(source2);
tEnv.executeSql(sink);
/*
由于是简单使用,没有在场景应用,简单介绍一下区别,北京大数据培训可以根据们不同的区别在自己项目中使用
left json : 无论是否join上都返回左表的数据
inner join : 只有join上才会返回匹配后的结果
full outer join : 两边的数据都会返回,无论是否join上,没有的则为null
interval join : 基于时间范围内的join,在指定的时间范围内返回join上的数据
*/
String joinSql = "select * from source1 s1" +
"left join source2 s2" +
// 内连接
// "inner join source2" || "join source2"
// 全连接
// "full outer join source2"
// 时间范围join
// "s1.ts >= s2.ts AND s1.ts < s2.ts + INTERVAL '10' MINUTE" +
" on s1.id =s2.id "
;
Table joinTable = tEnv.sqlQuery(joinSql);
// 分组排序,取topN, 如果要是去重 rnum=1即可实现去重操作
String insertSql = "insert into sink select id,name from(" +
"select *," +
"row_number() over(partition by id order by ts) as rnum " +
"from "+joinTable+" where rnum < 5 " +
")";
// add insert sql
TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
// 随意使用
// Optional<JobClient> jobClient = tableResult.getJobClient();
}
// 添加多个sql具体执行
private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
StatementSet statementSet = tEnv.createStatementSet();
for (String sql : sqls) {
if ("*".equals(sql) || sql.length()>=27) {
continue;
}
statementSet.addInsertSql(sql);
}
return statementSet.execute();
}
}
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <flink.version>1.12.2</flink.version>
- <scala.version>2.11</scala.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!---->
- <dependency>
- <groupId>ru.yandex.clickhouse</groupId>
- <artifactId>clickhouse-jdbc</artifactId>
- <version>0.2</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.6</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.2.4</version>
- </dependency>
- </dependencies>

文章来源857Hub
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。