当前位置:   article > 正文

Flink第八章:FlinkSQL_flinksql需要用到的依赖

flinksql需要用到的依赖

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作
Flink第七章:状态编程
Flink第八章:FlinkSQL



前言

这次博客我们记录以下FlinkSQL的学习内容
在这里插入图片描述
引入需要的依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

一、常用函数

1.快速上手案例

SimpleTableExample.scala

package com.atguigu.chapter07

import com.atguigu.chapter02.Source.Event
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object SimpleTableExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 读取数据源
    val eventStream: DataStream[Event] = env.fromElements(
      Event("Alice", "./home", 1000L),
      Event("Bob", "./cart", 1000L),
      Event("Alice", "./prod?id=1", 5 * 1000L),
      Event("Cary", "./home", 60 * 1000L),
      Event("Bob", "./prod?id=3", 90 * 1000L),
      Event("Alice", "./prod?id=7", 105 * 1000L)
    )

    // 创建表环境
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 将DataStream装换成表
    val eventTable: Table = tableEnv.fromDataStream(eventStream)

    // 调用Table API 进行转换计算(不建议)
    val resultTable: Table = eventTable.select($("user"), $("url"))
      .where($("user").isEqual("Alice"))
    // 直接写SQL

    tableEnv.createTemporaryView("eventTable",eventTable)
    val resultSQLTable: Table = tableEnv.sqlQuery("select url,user from eventTable where user='Bob'")

    // 装换成流打印
    tableEnv.toDataStream(resultTable).print("1")
    tableEnv.toDataStream(resultSQLTable).print("2")

    env.execute()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

在这里插入图片描述

2.连接外部数据(csv)

CommonApiTest.scala

package com.atguigu.chapter07

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object CommonApiTest {
  def main(args: Array[String]): Unit = {
    // 1创建表环境(2种方法)
    // 1.1直接基于流创建
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 1.2传入一个环境的配置参数创建
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tableEnvironment: TableEnvironment = TableEnvironment.create(settings)

    // 2.创建输入表
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT
        | ) WITH(
        | 'connector' = 'filesystem',
        | 'path' = 'input/clicks.txt',
        | 'format' = 'csv'
        | )
        |""".stripMargin)


    // 3.表的查询转换
    val resultTable: Table = tableEnv.sqlQuery("select uid,url,ts from eventTable where uid= 'Alice' ")

    val urlCountTable: Table = tableEnv.sqlQuery("select uid,count(url) from eventTable group by uid")

    // 4.创建输出表
    tableEnv.executeSql(
      """
        |CREATE TABLE outTable(
        |  uid STRING,
        |  url STRING,
        |  ts BIGINT
        |) WITH(
        | 'connector' = 'filesystem',
        | 'path'='output',
        | 'format'='csv'
        |)
        |""".stripMargin)


    // 5.输出结果
    resultTable.executeInsert("outTable")

    tableEnv.toDataStream(resultTable).print("resultTable")
    tableEnv.toChangelogStream(urlCountTable).print("count")
    env.execute()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

csv文件
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这里特别说明一下,我在创建输入表时出现了错误,代码没有报错,但是编译没有通过,最终从excel 导出一个csv文件传入,完成运行,之后在替换成符合格式要求的txt文件即可正常运行.(具体啥原因,我也不知道)

3.时间窗口案例

TimeAndWindowTest.scala

package com.atguigu.chapter07


import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.{DataTypes, Schema, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import java.time.Duration

object TimeAndWindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 1.在创建表的DDL中指定时间属性字段
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT,
        |   et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
        |   WATERMARK FOR et AS et - INTERVAL '2' SECOND
        | ) WITH(
        |   'connector' = 'filesystem',
        |   'path' = 'input/clicks.txt',
        |   'format' = 'csv'
        | )
        |""".stripMargin)

    // 2. 在将流转换成表的时候指定时间属性字段

    val eventStream: DataStream[Event] = env.fromElements(
      Event("Alice", "./home", 1000L),
      Event("Bob", "./cart", 1000L),
      Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
      Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
      Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
      Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
      Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
    ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
      .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
      }))

    // 已经弃用
//        val eventTable: Table = tableEnv.fromDataStream(eventStream,$("url"),$("user").as("uid"),
//          $("timestamp").as("ts"),$("et").rowtime())

    // 修改后
    val eventTable: Table = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
      .columnByExpression("et", "TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000))")
      .watermark("et", "SOURCE_WATERMARK()").build()).as("uid", "url","ts")


    eventTable.printSchema()

    tableEnv.createTemporaryView("eventTable", eventTable)

    // 测试累积窗口
    val resultTable: Table = tableEnv.sqlQuery(
      """
        |select
        |   uid, window_end AS endT ,COUNT(url) AS cnt
        |FROM TABLE(
        | CUMULATE(
        |   TABLE eventTable,
        |   DESCRIPTOR(et),
        |   INTERVAL '30' MINUTE,
        |   INTERVAL '1' HOUR
        | )
        |)
        |GROUP BY uid,window_start,window_end
        |""".stripMargin)

//    tableEnv.toDataStream(resultTable).print()


    // 测试开窗集合
    val overResultTable: Table = tableEnv.sqlQuery(
      """
        |SELECT uid,url,ts,AVG(ts) OVER (
        |  PARTITION BY uid
        |  ORDER BY et
        |  ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
        |) AS vag_ts
        |FROM eventTable
        |""".stripMargin)

    tableEnv.toChangelogStream(overResultTable).print("over")


    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

在这里插入图片描述

4.TopN案例1

TopNExample.scala

package com.atguigu.chapter07

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions}

object TopNExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    // 2.创建输入表
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT,
        |   et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
        |   WATERMARK FOR et AS et - INTERVAL '2' SECOND
        | ) WITH(
        | 'connector' = 'filesystem',
        | 'path' = 'input/clicks.txt',
        | 'format' = 'csv'
        | )
        |""".stripMargin)

    // TOP N 选取活跃度最大的两个用户
    // 1.进行分组聚合统计,计算每个用户访问量
    val urlCountTable: Table = tableEnv.sqlQuery("select uid,count(url) as cnt from eventTable group by uid")
    tableEnv.createTemporaryView("urlCountTable",urlCountTable)

    // 2.提取最大的两个用户
    val top2resultTable: Table = tableEnv.sqlQuery(
      """
        |SELECT uid,cnt,row_num
        |FROM (
        |   SELECT * ,ROW_NUMBER() OVER (
        |     ORDER BY cnt DESC
        |   )AS row_num
        |   FROM urlCountTable
        |)
        |WHERE row_num<=2
        |""".stripMargin)

    tableEnv.toChangelogStream(top2resultTable).print()

    env.execute()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

在这里插入图片描述

5.TopN案例2

TopNWindowExample.scala

package com.atguigu.chapter07

import com.atguigu.chapter02.Source.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Schema, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import java.time.Duration

object TopNWindowExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    // 2.创建输入表
    val eventStream: DataStream[Event] = env.fromElements(
      Event("Alice", "./home", 1000L),
      Event("Bob", "./cart", 1000L),
      Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
      Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
      Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
      Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
      Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
    ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
      .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
      }))

    val eventTable: Table = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
      .columnByExpression("et", "TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000))")
      .watermark("et", "SOURCE_WATERMARK()").build()).as("uid", "url")

    tableEnv.createTemporaryView("eventTable",eventTable)


    // TOP N 选取每小时内活跃度最大的两个用户
    // 1.进行窗口聚合统计,计算每个用户访问量
    val urlCountWindowTable: Table = tableEnv.sqlQuery(
      """
        |SELECT uid ,COUNT(url) AS cnt,window_start,window_end
        |FROM TABLE(
        |   TUMBLE(TABLE eventTable,DESCRIPTOR(et),INTERVAL '1' HOUR)
        |)
        |GROUP BY uid,window_start,window_end
        |""".stripMargin)

    tableEnv.createTemporaryView("urlCountWindowTable",urlCountWindowTable)

    // 2.提取最大的两个用户
    val top2resultTable: Table = tableEnv.sqlQuery(
      """
        |SELECT *
        |FROM (
        |   SELECT * ,ROW_NUMBER() OVER (
        |     PARTITION BY window_start,window_end
        |     ORDER BY cnt DESC
        |   )AS row_num
        |   FROM urlCountWindowTable
        |)
        |WHERE row_num<=2
        |""".stripMargin)

    tableEnv.toDataStream(top2resultTable).print()

    env.execute()

  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

在这里插入图片描述

二、UDF函数

1.Scalar Function(标量函数)

UdfTest_ScalaFunction.scala

package com.atguigu.chapter07

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction

object UdfTest_ScalaFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    // 2.创建输入表
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT,
        |   et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
        |   WATERMARK FOR et AS et - INTERVAL '2' SECOND
        | ) WITH(
        | 'connector' = 'filesystem',
        | 'path' = 'input/clicks.txt',
        | 'format' = 'csv'
        | )
        |""".stripMargin)

    // 2.注册标量函数
    tableEnv.createTemporarySystemFunction("myHash",classOf[MyHash])

    // 3.调用函数
    val resultTable: Table = tableEnv.sqlQuery("select uid,myHash(uid) from eventTable")

    // 4.打印输出
    tableEnv.toDataStream(resultTable).print()

    env.execute()


  }
  // 实现自定义标量函数 哈希函数
  class MyHash extends ScalarFunction {
    def eval(str:String): Int ={
      str.hashCode
    }
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

在这里插入图片描述

2.Table Function(表函数)

UdfTest_TableFunction.scala

package com.atguigu.chapter07

import com.atguigu.chapter07.UdfTest_ScalaFunction.MyHash
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.annotation.{DataTypeHint, ExtractionVersion, FunctionHint, HintFlag, InputGroup}
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row

import java.lang.annotation.Annotation

object UdfTest_TableFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    // 1.创建输入表
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT,
        |   et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
        |   WATERMARK FOR et AS et - INTERVAL '2' SECOND
        | ) WITH(
        | 'connector' = 'filesystem',
        | 'path' = 'input/clicks.txt',
        | 'format' = 'csv'
        | )
        |""".stripMargin)

    // 2.注册表函数
    tableEnv.createTemporarySystemFunction("MySplit", classOf[MySplit])

    // 3.调用函数
    val resultTable: Table = tableEnv.sqlQuery(
      """
        |select
        |   uid,url,word,len
        |from eventTable,lateral table(mySplit(url)) as T(word,len)
        |""".stripMargin)

    // 4.打印输出
    tableEnv.toDataStream(resultTable).print()

    env.execute()

  }

  // 实现自定义表函数 按照?分割url字段
  @FunctionHint(output = new DataTypeHint("ROW<word STRING,length INT>"))
  class MySplit extends TableFunction[Row] {
    def eval(str: String): Unit = {
      str.split("\\?").foreach(s => collect(Row.of(s, Int.box(s.length))))
    }
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

在这里插入图片描述

3.Aggregate Function(聚合函数).

UdfTest_AggregateFunction.scala

package com.atguigu.chapter07

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.AggregateFunction

object UdfTest_AggregateFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    // 1.创建输入表
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT,
        |   et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
        |   WATERMARK FOR et AS et - INTERVAL '2' SECOND
        | ) WITH(
        | 'connector' = 'filesystem',
        | 'path' = 'input/clicks.txt',
        | 'format' = 'csv'
        | )
        |""".stripMargin)

    // 2.注册表函数
    tableEnv.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])

    // 3.调用函数
    val resultTable: Table = tableEnv.sqlQuery(
      """
        |select
        |   uid,WeightedAvg(ts,1) as avg_ts
        |from eventTable
        |group by uid
        |""".stripMargin)

    // 4.打印输出
    tableEnv.toChangelogStream(resultTable).print()

    env.execute()

  }

  // 单独定义样例类
  case class WeightedAccumulator(var sum: Long = 0, var count: Int = 0)

  // 实现自定义聚合函数 计算加强平均数
  class WeightedAvg extends AggregateFunction[java.lang.Long, WeightedAccumulator] {
    override def getValue(accumulator: WeightedAccumulator): java.lang.Long = {
      if (accumulator.count == 0) {
        null
      } else {
        accumulator.sum / accumulator.count
      }
    }

    override def createAccumulator(): WeightedAccumulator = WeightedAccumulator() // 创建累加器

    // 每来一行数据,都会调用
    def accumulate(accumulator: WeightedAccumulator, iValue: java.lang.Long, iWeight: Int): Unit = {
      accumulator.sum += iValue
      accumulator.count += iWeight
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

在这里插入图片描述

4.Table Aggregate Function(表聚合函数 不建议)

package com.atguigu.chapter07

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.{$, call}
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.util.Collector

import java.sql.Timestamp


object UdfTest_TableAggFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)


    // 1.创建输入表
    tableEnv.executeSql(
      """
        | CREATE TABLE eventTable(
        |   uid STRING,
        |   url STRING,
        |   ts BIGINT,
        |   et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000)),
        |   WATERMARK FOR et AS et - INTERVAL '2' SECOND
        | ) WITH(
        | 'connector' = 'filesystem',
        | 'path' = 'input/clicks.txt',
        | 'format' = 'csv'
        | )
        |""".stripMargin)

    // 2.注册表聚合函数
    tableEnv.createTemporarySystemFunction("top2", classOf[Top2])

    // 3.调用函数
    // 首先进行窗口聚合得到cnt
    val urlCountWindowTable: Table = tableEnv.sqlQuery(
      """
        |SELECT uid ,COUNT(url) AS cnt,window_start as wstart ,window_end as wend
        |FROM TABLE(
        |   TUMBLE(TABLE eventTable,DESCRIPTOR(et),INTERVAL '1' HOUR)
        |)
        |GROUP BY uid,window_start,window_end
        |""".stripMargin)

    // 使用Table API调用表聚合函数
    val resultTable: Table = urlCountWindowTable.groupBy($("wend"))
      .flatAggregate(call("top2", $("uid"), $("cnt"), $("wstart"), $("wend")))
      .select($("uid"), $("rank"), $("cnt"), $("wend"))



    // 4.打印输出
    tableEnv.toChangelogStream(resultTable).print()

    env.execute()

  }

  // 定义输出结果和中间累加器的样例类
  case class Top2Result(uid: String, window_start: Timestamp, window_end: Timestamp, cnt: Long, rank: Int)

  case class Top2Acc(var maxCount: Long, var secondMaxCount: Long, var uid1: String, var uid2: String, var window_start: Timestamp, var window_end: Timestamp)

  // 实现表聚合自定义函数
  class Top2 extends TableAggregateFunction[Top2Result, Top2Acc] {
    override def createAccumulator(): Top2Acc = Top2Acc(Long.MinValue, Long.MinValue, null, null, null, null)

    // 每来一行数据,需要使用acc进行统计
    def accumulate(acc: Top2Acc, uid: String, cnt: Long, window_start: Timestamp, window_end: Timestamp): Unit = {
      acc.window_start = window_start
      acc.window_end = window_end
      // 判断当前count值是否排名前两位
      if (cnt > acc.maxCount) {
        // 名次向后顺延
        acc.secondMaxCount = acc.maxCount
        acc.uid2 = acc.uid1
        acc.maxCount = cnt
        acc.uid1 = uid
      } else if (cnt > acc.secondMaxCount) {
        acc.secondMaxCount = cnt
        acc.uid2 = uid
      }
    }

    // 输出结果数据
    def emitValue(acc: Top2Acc, out: Collector[Top2Result]): Unit = {
      // 判断cnt值是否为初始值
      if (acc.maxCount != Long.MinValue) {
        out.collect(Top2Result(acc.uid1, acc.window_start, acc.window_end, acc.maxCount, 1))
      }
      if (acc.secondMaxCount!=Long.MinValue){
        out.collect(Top2Result(acc.uid2,acc.window_start,acc.window_end,acc.secondMaxCount,2))
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

在这里插入图片描述


总结

FlinkSQL的内容就记录到这里.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/396801
推荐阅读
相关标签
  

闽ICP备14008679号