赞
踩
Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作
Flink第七章:状态编程
Flink第八章:FlinkSQL
这次博客我们记录以下FlinkSQL的学习内容
引入需要的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
SimpleTableExample.scala
package com.atguigu.chapter07
import com.atguigu.chapter02.Source.Event
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object SimpleTableExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据源
val eventStream: DataStream[Event] = env.fromElements(
Event("Alice", "./home", 1000L),
Event("Bob", "./cart", 1000L),
Event("Alice", "./prod?id=1", 5 * 1000L),
Event("Cary", "./home", 60 * 1000L),
Event("Bob", "./prod?id=3", 90 * 1000L),
Event("Alice", "./prod?id=7", 105 * 1000L)
)
// 创建表环境
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 将DataStream装换成表
val eventTable: Table = tableEnv.fromDataStream(eventStream)
// 调用Table API 进行转换计算(不建议)
val resultTable: Table = eventTable.select($("user"), $("url"))
.where($("user").isEqual("Alice"))
// 直接写SQL
tableEnv.createTemporaryView("eventTable",eventTable)
val resultSQLTable: Table = tableEnv.sqlQuery("select url,user from eventTable where user='Bob'")
// 装换成流打印
tableEnv.toDataStream(resultTable).print("1")
tableEnv.toDataStream(resultSQLTable).print("2")
env.execute()
}
}
CommonApiTest.scala
package com.atguigu.chapter07
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object CommonApiTest {
def main(args: Array[String]): Unit = {
// 1创建表环境(2种方法)
// 1.1直接基于流创建
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 1.2传入一个环境的配置参数创建
val settings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnvironment: TableEnvironment = TableEnvironment.create(settings)
// 2.创建输入表
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// 3.表的查询转换
val resultTable: Table = tableEnv.sqlQuery("select uid,url,ts from eventTable where uid= 'Alice' ")
val urlCountTable: Table = tableEnv.sqlQuery("select uid,count(url) from eventTable group by uid")
// 4.创建输出表
tableEnv.executeSql(
"""
|CREATE TABLE outTable(
| uid STRING,
| url STRING,
| ts BIGINT
|) WITH(
| 'connector' = 'filesystem',
| 'path'='output',
| 'format'='csv'
|)
|""".stripMargin)
// 5.输出结果
resultTable.executeInsert("outTable")
tableEnv.toDataStream(resultTable).print("resultTable")
tableEnv.toChangelogStream(urlCountTable).print("count")
env.execute()
}
}
csv文件
这里特别说明一下,我在创建输入表时出现了错误,代码没有报错,但是编译没有通过,最终从excel 导出一个csv文件传入,完成运行,之后在替换成符合格式要求的txt文件即可正常运行.(具体啥原因,我也不知道)
TimeAndWindowTest.scala
package com.atguigu.chapter07
import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.{DataTypes, Schema, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import java.time.Duration
object TimeAndWindowTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 1.在创建表的DDL中指定时间属性字段
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT,
| et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
| WATERMARK FOR et AS et - INTERVAL '2' SECOND
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// 2. 在将流转换成表的时候指定时间属性字段
val eventStream: DataStream[Event] = env.fromElements(
Event("Alice", "./home", 1000L),
Event("Bob", "./cart", 1000L),
Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
}))
// 已经弃用
// val eventTable: Table = tableEnv.fromDataStream(eventStream,$("url"),$("user").as("uid"),
// $("timestamp").as("ts"),$("et").rowtime())
// 修改后
val eventTable: Table = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.columnByExpression("et", "TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000))")
.watermark("et", "SOURCE_WATERMARK()").build()).as("uid", "url","ts")
eventTable.printSchema()
tableEnv.createTemporaryView("eventTable", eventTable)
// 测试累积窗口
val resultTable: Table = tableEnv.sqlQuery(
"""
|select
| uid, window_end AS endT ,COUNT(url) AS cnt
|FROM TABLE(
| CUMULATE(
| TABLE eventTable,
| DESCRIPTOR(et),
| INTERVAL '30' MINUTE,
| INTERVAL '1' HOUR
| )
|)
|GROUP BY uid,window_start,window_end
|""".stripMargin)
// tableEnv.toDataStream(resultTable).print()
// 测试开窗集合
val overResultTable: Table = tableEnv.sqlQuery(
"""
|SELECT uid,url,ts,AVG(ts) OVER (
| PARTITION BY uid
| ORDER BY et
| ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
|) AS vag_ts
|FROM eventTable
|""".stripMargin)
tableEnv.toChangelogStream(overResultTable).print("over")
env.execute()
}
}
TopNExample.scala
package com.atguigu.chapter07
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}
object TopNExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 2.创建输入表
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT,
| et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
| WATERMARK FOR et AS et - INTERVAL '2' SECOND
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// TOP N 选取活跃度最大的两个用户
// 1.进行分组聚合统计,计算每个用户访问量
val urlCountTable: Table = tableEnv.sqlQuery("select uid,count(url) as cnt from eventTable group by uid")
tableEnv.createTemporaryView("urlCountTable",urlCountTable)
// 2.提取最大的两个用户
val top2resultTable: Table = tableEnv.sqlQuery(
"""
|SELECT uid,cnt,row_num
|FROM (
| SELECT * ,ROW_NUMBER() OVER (
| ORDER BY cnt DESC
| )AS row_num
| FROM urlCountTable
|)
|WHERE row_num<=2
|""".stripMargin)
tableEnv.toChangelogStream(top2resultTable).print()
env.execute()
}
}
TopNWindowExample.scala
package com.atguigu.chapter07
import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Schema, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import java.time.Duration
object TopNWindowExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 2.创建输入表
val eventStream: DataStream[Event] = env.fromElements(
Event("Alice", "./home", 1000L),
Event("Bob", "./cart", 1000L),
Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
}))
val eventTable: Table = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.columnByExpression("et", "TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000))")
.watermark("et", "SOURCE_WATERMARK()").build()).as("uid", "url")
tableEnv.createTemporaryView("eventTable",eventTable)
// TOP N 选取每小时内活跃度最大的两个用户
// 1.进行窗口聚合统计,计算每个用户访问量
val urlCountWindowTable: Table = tableEnv.sqlQuery(
"""
|SELECT uid ,COUNT(url) AS cnt,window_start,window_end
|FROM TABLE(
| TUMBLE(TABLE eventTable,DESCRIPTOR(et),INTERVAL '1' HOUR)
|)
|GROUP BY uid,window_start,window_end
|""".stripMargin)
tableEnv.createTemporaryView("urlCountWindowTable",urlCountWindowTable)
// 2.提取最大的两个用户
val top2resultTable: Table = tableEnv.sqlQuery(
"""
|SELECT *
|FROM (
| SELECT * ,ROW_NUMBER() OVER (
| PARTITION BY window_start,window_end
| ORDER BY cnt DESC
| )AS row_num
| FROM urlCountWindowTable
|)
|WHERE row_num<=2
|""".stripMargin)
tableEnv.toDataStream(top2resultTable).print()
env.execute()
}
}
UdfTest_ScalaFunction.scala
package com.atguigu.chapter07
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
object UdfTest_ScalaFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 2.创建输入表
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT,
| et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
| WATERMARK FOR et AS et - INTERVAL '2' SECOND
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// 2.注册标量函数
tableEnv.createTemporarySystemFunction("myHash",classOf[MyHash])
// 3.调用函数
val resultTable: Table = tableEnv.sqlQuery("select uid,myHash(uid) from eventTable")
// 4.打印输出
tableEnv.toDataStream(resultTable).print()
env.execute()
}
// 实现自定义标量函数 哈希函数
class MyHash extends ScalarFunction {
def eval(str:String): Int ={
str.hashCode
}
}
}
UdfTest_TableFunction.scala
package com.atguigu.chapter07
import com.atguigu.chapter07.UdfTest_ScalaFunction.MyHash
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.annotation.{DataTypeHint, ExtractionVersion, FunctionHint, HintFlag, InputGroup}
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
import java.lang.annotation.Annotation
object UdfTest_TableFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 1.创建输入表
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT,
| et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
| WATERMARK FOR et AS et - INTERVAL '2' SECOND
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// 2.注册表函数
tableEnv.createTemporarySystemFunction("MySplit", classOf[MySplit])
// 3.调用函数
val resultTable: Table = tableEnv.sqlQuery(
"""
|select
| uid,url,word,len
|from eventTable,lateral table(mySplit(url)) as T(word,len)
|""".stripMargin)
// 4.打印输出
tableEnv.toDataStream(resultTable).print()
env.execute()
}
// 实现自定义表函数 按照?分割url字段
@FunctionHint(output = new DataTypeHint("ROW<word STRING,length INT>"))
class MySplit extends TableFunction[Row] {
def eval(str: String): Unit = {
str.split("\\?").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}
}
UdfTest_AggregateFunction.scala
package com.atguigu.chapter07
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.AggregateFunction
object UdfTest_AggregateFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 1.创建输入表
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT,
| et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
| WATERMARK FOR et AS et - INTERVAL '2' SECOND
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// 2.注册表函数
tableEnv.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
// 3.调用函数
val resultTable: Table = tableEnv.sqlQuery(
"""
|select
| uid,WeightedAvg(ts,1) as avg_ts
|from eventTable
|group by uid
|""".stripMargin)
// 4.打印输出
tableEnv.toChangelogStream(resultTable).print()
env.execute()
}
// 单独定义样例类
case class WeightedAccumulator(var sum: Long = 0, var count: Int = 0)
// 实现自定义聚合函数 计算加强平均数
class WeightedAvg extends AggregateFunction[java.lang.Long, WeightedAccumulator] {
override def getValue(accumulator: WeightedAccumulator): java.lang.Long = {
if (accumulator.count == 0) {
null
} else {
accumulator.sum / accumulator.count
}
}
override def createAccumulator(): WeightedAccumulator = WeightedAccumulator() // 创建累加器
// 每来一行数据,都会调用
def accumulate(accumulator: WeightedAccumulator, iValue: java.lang.Long, iWeight: Int): Unit = {
accumulator.sum += iValue
accumulator.count += iWeight
}
}
}
package com.atguigu.chapter07
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.{$, call}
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.util.Collector
import java.sql.Timestamp
object UdfTest_TableAggFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 1.创建输入表
tableEnv.executeSql(
"""
| CREATE TABLE eventTable(
| uid STRING,
| url STRING,
| ts BIGINT,
| et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
| WATERMARK FOR et AS et - INTERVAL '2' SECOND
| ) WITH(
| 'connector' = 'filesystem',
| 'path' = 'input/clicks.txt',
| 'format' = 'csv'
| )
|""".stripMargin)
// 2.注册表聚合函数
tableEnv.createTemporarySystemFunction("top2", classOf[Top2])
// 3.调用函数
// 首先进行窗口聚合得到cnt
val urlCountWindowTable: Table = tableEnv.sqlQuery(
"""
|SELECT uid ,COUNT(url) AS cnt,window_start as wstart ,window_end as wend
|FROM TABLE(
| TUMBLE(TABLE eventTable,DESCRIPTOR(et),INTERVAL '1' HOUR)
|)
|GROUP BY uid,window_start,window_end
|""".stripMargin)
// 使用Table API调用表聚合函数
val resultTable: Table = urlCountWindowTable.groupBy($("wend"))
.flatAggregate(call("top2", $("uid"), $("cnt"), $("wstart"), $("wend")))
.select($("uid"), $("rank"), $("cnt"), $("wend"))
// 4.打印输出
tableEnv.toChangelogStream(resultTable).print()
env.execute()
}
// 定义输出结果和中间累加器的样例类
case class Top2Result(uid: String, window_start: Timestamp, window_end: Timestamp, cnt: Long, rank: Int)
case class Top2Acc(var maxCount: Long, var secondMaxCount: Long, var uid1: String, var uid2: String, var window_start: Timestamp, var window_end: Timestamp)
// 实现表聚合自定义函数
class Top2 extends TableAggregateFunction[Top2Result, Top2Acc] {
override def createAccumulator(): Top2Acc = Top2Acc(Long.MinValue, Long.MinValue, null, null, null, null)
// 每来一行数据,需要使用acc进行统计
def accumulate(acc: Top2Acc, uid: String, cnt: Long, window_start: Timestamp, window_end: Timestamp): Unit = {
acc.window_start = window_start
acc.window_end = window_end
// 判断当前count值是否排名前两位
if (cnt > acc.maxCount) {
// 名次向后顺延
acc.secondMaxCount = acc.maxCount
acc.uid2 = acc.uid1
acc.maxCount = cnt
acc.uid1 = uid
} else if (cnt > acc.secondMaxCount) {
acc.secondMaxCount = cnt
acc.uid2 = uid
}
}
// 输出结果数据
def emitValue(acc: Top2Acc, out: Collector[Top2Result]): Unit = {
// 判断cnt值是否为初始值
if (acc.maxCount != Long.MinValue) {
out.collect(Top2Result(acc.uid1, acc.window_start, acc.window_end, acc.maxCount, 1))
}
if (acc.secondMaxCount!=Long.MinValue){
out.collect(Top2Result(acc.uid2,acc.window_start,acc.window_end,acc.secondMaxCount,2))
}
}
}
}
FlinkSQL的内容就记录到这里.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。