赞
踩
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
// 初始化环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1
在一个本地内存中,生成一个集合作为Flink处理的source。
离线处理代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object ListSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive hbase"))
listDataSet.print()
}
}
实时处理代码如下:
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object ListSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop spark","hive hbase"))
listDataStream.print()
env.execute("ListSourceStream is runned")
}
}
导入本地文本数据作为数据源。
离线处理代码如下:
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object FileSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val fileDataSet = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataSet.print()
}
}
实时处理代码如下:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object FileSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val fileDataStream: DataStream[String] = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataStream.print()
env.execute("FileSourceStream is runned")
}
}
读取hdfs文件,作为数据源。
离线处理代码如下:
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object hdfsSource {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
hdfsDataSet.print()
}
}
实时处理代码如下:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object hdfsSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
hdfsDataStream.print()
env.execute("hdfsSourceStream is runned")
}
}
处理代码如下:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._
object kafkaSourceStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
props.setProperty("group.id", "consumer-group")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
//SimpleStringSchema反序列化工具
val kafkaDataStream: DataStream[String] =
env.addSource(new FlinkKafkaConsumer010[String]("test",new SimpleStringSchema(),props))
kafkaDataStream.print()
env.execute(“kafkaSourceStream is runned”)
}
}
除了以上的source数据来源,我们还可以自定义source,只是继承SourceFunction即可。
自定义source代码如下:
import org.apache.flink.streaming.api.functions.source.SourceFunction
class MySource extends SourceFunction[String] {
//定义标志位用来标记是否正常运行
var running = true
override def cancel(): Unit = {
running = false
}
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val data: Range.Inclusive = 1.to(10)
while (running) {
data.foreach(t => {
sourceContext.collect(t.toString)
})
}
}
}
调用自定义source代码如下:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
object DefineSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val defineSource: DataStream[String] = env.addSource(new MySource())
defineSource.print()
env.execute("DefineSource is runned")
}
}
sink 也就是Flink运行完后,最终要将数据输出到哪儿。
将数据最终输出到内存中的集合中。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object listSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark","hive"))
val list: Seq[String] = listDataSet.collect()
list.foreach(println(_))
}
}
1.3.2基于本地文件的sink
将结果输出到本地文件系统中。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object fileSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val fileDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
fileDataSet.writeAsText("C:\\Users\\thinkpad\\Desktop\\print.txt")
env.execute("fileSink is runned")
}
}
将结果输出到hdfs文件系统中。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object hdfsSink {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val hdfsDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
hdfsDataSet.writeAsText("hdfs://linux01:9000/hdfsSink")
env.execute("hdfsSink is runned")
}
}
将结果输出到kafka文件系统中,用flink作为kafka的生产者。
示例代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
object kafkaSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
props.setProperty("group.id", "consumer-group")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
listDataStream.addSink(new FlinkKafkaProducer010[String]("linux01:9092,linux02:9092,linux03:9092","test",new SimpleStringSchema()))
env.execute("kafkaSink is runned")
}
}
将计算结果存储到关系数据库中,如mysql等。
导入依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
实现MyJdbcSink类,继承RichSinkFunction,用来是实现保存到mysql中调用的命令。
import java.sql
import java.sql.DriverManager
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
//为什么继承的是富函数
class MyJdbcSink extends RichSinkFunction[String] {
//定义连接参数成员属性
var conn: Connection = _
var prepare: PreparedStatement = _
//打开连接
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata",
"root", "root")
prepare= conn.prepareStatement("INSERT INTO infoTest VALUES (?, ?)")
}
//执行sql语句
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
prepare.setString(1,value)
prepare.setString(2,value)
prepare.execute()
}
//关闭资源
override def close(): Unit = {
prepare.close()
conn.close()
}
}
将结果写入mysql,调用自定义mysql类,代码如下:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
object mysqlSInk {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataSream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
listDataSream.addSink(new MyJdbcSink())
env.execute("mysqlSInk is runned")
}
}
将计算结果存储到redis非关系数据库中。
导入flink-redis依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
定义一个redis的mapper类,继承RedisMapper类,用于定义保存到 redis时调用的命令,代码如下:
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class MyRedisMapper extends RedisMapper[String]{
//定义保存到redis中的命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"redis")
}
override def getKeyFromData(t: String): String = {
t.hashCode.toString
}
override def getValueFromData(t: String): String = {
t
}
}
将结果输入到redis代码如下:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
object RedisSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setDatabase(0).build()
listDataStream.addSink(new RedisSink[String](conf,new MyRedisMapper))
env.execute("RedisSink is runned")
}
}
在flink中有类似于spark的一类转换算子,就是transform,在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink使数据落地。
常用的transform转换算子如下:
Transformation | 说明 |
---|---|
map | 将DataSet中的每一个元素转换为另外一个元素 |
flatMap | 将DataSet中的每一个元素转换为0…n个元素 |
mapPartition | 将一个分区中的元素转换为另一个元素 |
filter | 过滤出来一些符合条件的元素 |
reduce | 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素 |
reduceGroup | 将一个dataset或者一个group聚合成一个或多个元素 |
aggregate | 按照内置的方式来进行聚合。例如:SUM/MIN/MAX… |
distinct | 去重 |
join | 将两个DataSet按照一定条件连接到一起,形成新的DataSet |
union | 将两个DataSet取并集,并自动进行去重 |
KeyBy | 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的 |
Split | 根据某些特征把一个 DataStream 拆分成两个或者多个 |
Select | 从一个 SplitStream 中获取一个或者多个 DataStream |
Connect | 连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 |
CoMap,CoFlatMap | 跟map and flatMap类似,只不过作用在ConnectedStreams上 |
rebalance | 让每个分区的数据均匀分布,避免数据倾斜 |
partitionByHash | 按照指定的key进行hash分区 |
sortPartition | 指定字段对分区中的数据进行排序 |
将DataSet中的每一个元素转换为另外一种形式的元素
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3))
val result: DataSet[Int] = listDataSet.map(_*2)
result.print()
}
}
flatMap也是一种类似于遍历循环,是将每一个元素按照特定的标识切分,变成多个元素。
如将集合中每个元素按照空格切分。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
val result: DataSet[String] = listDataSet.flatMap(_.split(" "))
result.print()
}
}
mapPartition:中的函数是在每个分区运行一次
map :每个元素运行一次
mapPartition是按照分区进行处理数据,传入是一个迭代,是将分区中的元素进行转换,map 和 mapPartition 的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:
访问mysql数据库,需要打开连接,此时map效率较低。而使用 mapPartition 可以有效减少连接数,提高效率。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
val result: DataSet[String] = listDataSet.mapPartition(iter => {
iter.flatMap(_.split(" "))
})
result.print()
}
}
filter是遍历循环dataset中每一个元素,filter中满足表达式的过滤出来,不满足表达式的过滤掉。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive","hbase kafka"))
val result: DataSet[String] = listDataSet.filter(_.length>=5)
result.print()
}
}
reduce是对一个 dataset 或者一个 group 来进行聚合计算,按照表达逻辑最终聚合成一个元素。
示例代码如下:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object Transform {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3,4))
val result = listDataSet.reduce(_+_)
result.print()
}
}
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有
Window 可以分成两类:CountWindow:按照指定的数据条数生成一个 Window,与时间无关;TimeWindow:按照时间生成 Window。
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行。
object Windows {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val file: DataStream[String] = env.socketTextStream("node01",9999)
val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.countWindow(2)
.sum(1)
countStream.print()
env.execute("Windows is runned")
}
}
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。
object Windows {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val file: DataStream[String] = env.socketTextStream("node01",9999)
val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.countWindow(5,2)
.sum(1)
countStream.print()
env.execute("Windows is runned")
}
}
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:
将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。所有的数据只能落在一个窗口里面
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口
适用场景: 适合做 BI 统计等(做每个时间段的聚合计算)。
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间
特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔、窗口长度是某个数值的整数倍
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据
电商网站: 登录一个系统之后,多长时间没有操作,session就失效。
手机银行: 登录一个系统之后,多长时间没有操作,session就失效要求重新登录。
由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
特点: 时间无对齐。多长时间之内没有收到数据,这个不是人为能规定的。
session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去
window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:
每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。
trigger()
触发器 定义 window 什么时候关闭,触发计算并输出结果
evitor()
移除器 定义移除某些数据的逻辑
allowedLateness()
允许处理迟到的数据
sideOutputLateData()
将迟到的数据放入侧输出流
getSideOutput()
获取侧输出流
Table API是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测;允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
<!-- flink-table&sql -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.9.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:
创建 TableEnvironment:
// 基于流的tableEnv
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = StreamTableEnvironment.create(sEnv)
// 基于批的bTableEnv
val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(bEnv)
数据加载通常有两种:一者基于流/批,一者基于TableSource,但是后者在Flink1.11中已经被废弃,所以不建议使用。
case class Student(id:Int,name:String,age:Int,gender:String,course:String,score:Int)
object FlinkBatchTableOps {
def main(args: Array[String]): Unit = {
//构建batch的executionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
val bTEnv = BatchTableEnvironment.create(env)
val dataSets: DataSet[Student] = env.readCsvFile[Student]("E:\\data\\student.csv",
//是否忽略文件的第一行数据(主要考虑表头数据)
ignoreFirstLine = true,
//字段之间的分隔符
fieldDelimiter = "|")
//table 就相当于sparksql中的dataset
val table: Table = bTEnv.fromDataSet(dataSets)
//条件查询
val result: Table = table.select("name,age").where("age=25")
//打印输出
bTEnv.toDataSet[Row](result).print()
}
}
case class Goods(id: Int,brand:String,category:String)
object FlinkStreamTableOps {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sTEnv = StreamTableEnvironment.create(env)
val dataStream: DataStream[Goods] = env.fromElements(
"001|mi|mobile",
"002|mi|mobile",
"003|mi|mobile",
"004|mi|mobile",
"005|huawei|mobile",
"006|huawei|mobile",
"007|huawei|mobile",
"008|Oppo|mobile",
"009|Oppo|mobile",
"010|uniqlo|clothing",
"011|uniqlo|clothing",
"012|uniqlo|clothing",
"013|uniqlo|clothing",
"014|uniqlo|clothing",
"015|selected|clothing",
"016|selected|clothing",
"017|selected|clothing",
"018|Armani|clothing",
"019|lining|sports",
"020|nike|sports",
"021|adidas|sports",
"022|nike|sports",
"023|anta|sports",
"024|lining|sports"
).map(line => {
val fields = line.split("\\|")
Goods(fields(0), fields(1), fields(2))
})
//load data from external system
var table = sTEnv.fromDataStream(dataStream)
// stream table api
table.printSchema()
// 高阶api的操作
table = table.select("category").distinct()
/*
将一个table转化为一个DataStream的时候,有两种选择:
1. toAppendStream :在没有聚合操作的时候使用
2. toRetractStream(缩放的含义) :在进行聚合操作之后使用
*/
sTEnv.toRetractStream[Row](table).print()
env.execute("FlinkStreamTableOps")
}
}
sql仍然是最主要的分析工具,使用dsl当然也能完成业务分析,但是灵活性,简易性上都不及sql。FlinkTable通过sqlQuery来完成sql的查询操作。
object FlinkSQLOps {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val sTEnv = BatchTableEnvironment.create(env)
val dataStream: DataSet[Goods] = env.fromElements(
"001|mi|mobile",
"002|mi|mobile",
"003|mi|mobile",
"004|mi|mobile",
"005|huawei|mobile",
"006|huawei|mobile",
"007|huawei|mobile",
"008|Oppo|mobile",
"009|Oppo|mobile",
"010|uniqlo|clothing",
"011|uniqlo|clothing",
"012|uniqlo|clothing",
"013|uniqlo|clothing",
"014|uniqlo|clothing",
"015|selected|clothing",
"016|selected|clothing",
"017|selected|clothing",
"018|Armani|clothing",
"019|lining|sports",
"020|nike|sports",
"021|adidas|sports",
"022|nike|sports",
"023|anta|sports",
"024|lining|sports"
).map(line => {
val fields = line.split("\\|")
Goods(fields(0), fields(1), fields(2))
})
//load data from external system
sTEnv.registerTable("goods", dataStream)
//sql操作
var sql =
"""
|select
| id,
| brand,
| category
|from goods
|""".stripMargin
sql =
"""
|select
| category,
| count(1) counts
|from goods
|group by category
|order by counts desc
|""".stripMargin
table = sTEnv.sqlQuery(sql)
sTEnv.toDataSet[Row](table).print()
}
}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row
//基于滚动窗口Table操作
object FlinkTrumblingWindowTableOps {
def main(args: Array[String]): Unit = {
//1、获取流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 2、获取table执行环境
val tblEnv = StreamTableEnvironment.create(env)
//3、获取数据源
//输入数据:
val ds = env.socketTextStream("node01", 9999)
.map(line => {
val fields = line.split("\t")
UserLogin(fields(0), fields(1), fields(2), fields(3).toInt,
fields(4))
})
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[UserLogin](Time.seconds(2)) {
override def extractTimestamp(userLogin: UserLogin): Long = {
userLogin.dataUnix * 1000
}
}
)
//4、将DataStream转换成table
//引入隐式
//某天每隔2秒的输入记录条数:
import org.apache.flink.table.api.scala._
val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.rowtime)
// tblEnv.toAppendStream[Row](table).print()
tblEnv.sqlQuery(
s"""
|select
|platform,
|count(1) counts
|from ${table}
|where status = 'LOGIN'
|group by platform, tumble(ts,interval '2' second)
|""".stripMargin)
.toAppendStream[Row]
.print("每隔2秒不同平台登录用户->")
env.execute()
}
}
/** 用户登录
*
* @param platform 所在平台 id(e.g. H5/IOS/ADR/IOS_YY)
* @param server 所在游戏服 id
* @param uid 用户唯一 id
* @param dataUnix 事件时间/s 时间戳
* @param status 登录动作(LOGIN/LOGOUT)
*/
case class UserLogin(platform: String, server: String, uid: String, dataUnix: Int, status: String)
object FlinkTrumblingWindowTableOps2 {
def main(args: Array[String]): Unit = {
//1、获取流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2、获取table执行环境
val tblEnv = StreamTableEnvironment.create(env)
//3、获取数据源
//输入数据:
val ds = env.socketTextStream("node01", 9999)
.map(line => {
val fields = line.split("\t")
UserLogin(fields(0), fields(1), fields(2), fields(3).toInt, fields(4))
})
//4、将DataStream转换成table
//引入隐式
//某天每隔2秒的输入记录条数:
import org.apache.flink.table.api.scala._
val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.proctime)
// tblEnv.toAppendStream[Row](table).print()
tblEnv.sqlQuery(
s"""
|select
| platform,
| count(1) counts
|from ${table}
|where status = 'LOGIN'
|group by platform, tumble(ts,interval '2' second)
|""".stripMargin)
.toAppendStream[Row]
.print("prcotime-每隔2秒不同平台登录用户->")
env.execute()
}
}
自定义标量函数(User Defined Scalar Function)。一行输入一行输出。
某个用户在某个时刻浏览了某个商品,以及商品的价值
{"userID": 2, "eventTime": "2020-10-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
ScalarFunction
抽象类,主要实现eval方法。object FlinkTableUDFOps {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val bTEnv = BatchTableEnvironment.create(env)
val ds = env.fromElements(
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:00\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:02\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:10\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:12\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:15\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
"{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:16\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}"
).map(line => {
val jsonObj = new JSONObject(line)
val userID = jsonObj.getInt("userID")
val eventTime = jsonObj.getString("eventTime")
val eventType = jsonObj.getString("eventType")
val productID = jsonObj.getString("productID")
val productPrice = jsonObj.getDouble("productPrice")
UserBrowseLog(userID, eventTime, eventType, productID, productPrice)
})
//自定义udf
bTEnv.registerFunction("to_time", new TimeScalarFunction())
bTEnv.registerFunction("myLen", new LenScalarFunction())
val table = bTEnv.fromDataSet(ds)
val sql =
s"""
|select
| userID,
| eventTime,
| myLen(eventTime) my_len_et,
| to_time(eventTime) timestamps
|from ${table}
|""".stripMargin
val ret = bTEnv.sqlQuery(sql)
bTEnv.toDataSet[Row](ret).print
}
}
case class UserBrowseLog(
userID: Int,
eventTime: String,
eventType: String,
productID: String,
productPrice: Double
)
/*
自定义类去扩展ScalarFunction 复写其中的方法:eval
at least one method named 'eval' which is public, not
*/
class TimeScalarFunction extends ScalarFunction {
//2020-10-01 10:02:16
private val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def eval(eventTime: String): Long = {
df.parse(eventTime).getTime
}
}
class LenScalarFunction extends ScalarFunction {
//2020-10-01 10:02:16
def eval(str: String): Int = {
str.length
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。