赞
踩
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。
批流统一
支持高吞吐、低延迟、高性能的流处
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session窗口操作
支持具有Backpressure功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
支持迭代计算
Flink在JVM内部实现了自己的内存管理
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
SQL/Table API :Flink对流数据的进一步封装,也实现了非常丰富的API
DataStream :对流数据进行开发,Flink封装了针对流数据操作的API
ProcessFunction :最底层开发,自定义实现功能模块
JobManager
TaskManger
ResourceMager
Dispacher
Client
Flink用来提交任务的客户端,可以用命令提交,也可以用浏览器提交
Task
Task是一个阶段多个功能相同suntask的集合,类似spark中的taskset
Subtask
Subtask是flink中任务执行最小单元,是一个java类的实例,这份java类中有属性和方法,完成具体的计算逻辑
Operator chain
没有shuffle的多个算子合并在一个subtask中就形成了Operator chain,类似spark中的pipeline
Slot
Flink中计算资源进行隔离的单元,一个slot中可以运行多个subtask,但是这些subtask必须是来自同一个job的不同task的subtask
State
Flink任务运行过程中计算的中间结果
Checkpoint
Flink用来将中间结果持久化的指定的存储系统的一种定期执行的机制
stateBackend
Flink用来存储中间计算结果的存储系统,flink支持三种statebackend。分别是memory,fsbackend,rocksDB
首先由Dispatcher接收提交的应用,然后启动JobManger,JobManger根据自己生成的执行图去向ResourceManger申请对应的Slots数量,ResourceManger接收到请求后会根据向自己注册的TaskManager所有Slots数量去判断是否有足够的资源执行任务,然后满足资源需求,ResourceManger就会发送Slots所存在的TaskManager申请资源的JobManger,然后TaskManger连接JobManger,JobManager则分配执行任务,然后TaskManager开始执行分配的任务,不同TaskManager在执行过程中可能存在交换数据
首先客户端上传Flink的Jar包和配置,然后提交job至ResourceManager(yarn),由ResourceManager启用ApplicationMaster,在ApplicatiionMaster内启动Flink组件JobManager、ResourceManager(flink) ,然后JobManager向ResourceManager(flink)申请所需的资源,ResourceManager(flink)在向ResourceManager(yarn)申请资源,ResourceManager(yarn),根据申请资源数量,在一定数量的NodeManager节点内启动的TaskManager,然后TaskManager向ResourceManger(flink)注册Slots,然后ResourceManager向TaskManager发送申请Slots的JobManager,然后TaskManger连接JobManager,JobManager则发送分配的任务,TaskManager就会根据分配的任务开始执行,其实在yarn管理资源的时候,就是当接收到一个Job任务时,就会启动一个Flink集群,当任务处理完时,这个集群资源也就释放了
disableChaining
startNewChain
slotSharingGroup(共享组)
框架 | 优点 | 缺点 |
---|---|---|
Storm | 低延迟 | 吞吐量低、不能保证exactly-once、编程API不丰富 |
Spark Streaming | 吞吐量高、可以保证exactly-once、编程API丰富 | 延迟较高 |
Flink | 低延迟、吞吐量高、可以保证exactly-once、编程API丰富 | 快速迭代中,API变化比较快 |
下载地址
解压安装
tar -zxvf flink-1.10.1-bin-scala_2.11.tgz
配置文件
vi conf/flink-conf.yaml
# JobManager runs.
#指定jobmanger地址
jobmanager.rpc.address: localhost
# The RPC port where the JobManager is reachable.
#内部通信端口号
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
#配置JobManager JVM堆内存大小
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
#配置TaskManager JVM堆内存大小
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#配置taskmanager默认Slots数
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
#配置默认并行度
parallelism.default: 1
将配置好的Flink发送至其他节点
scp /opt/flink/* 用户名@主机名:$PWD/opt/flink
启动flink
./bin/start-cluster.sh
查看进程
#JobManager进程
StandaloneSessionClusterEntrypoint
#TaskManager进程
TaskManagerRunner
命令行提交任务
#参数说明:
#-m指定主机名后面的端口为JobManager的REST的端口,而不是RPC的端口,RPC通信端口是6123
#-p 指定是并行度
#-c 指定main方法的全类名
./bin/flink run -m 192.168.xx.xx:8081 -p 4 -c com.wedoctor.flink.WordCount /home/pgxl/liuzc/flink-project-scala-1.0.jar --hostname 192.168.xx.xx --port 7777
访问Web页面
192.168.**.**:8081
Web提交任务
上传Jar包
选择打好的Jar包
查看上传的jar包并进行配置
指定Class以及配置host与port
查看Plan,没有问题就可以submit了
然后就可以看到以下的界面,Task允许状态都是正常的
测试数据
hello word
hello Flink
hello java
hello scala
hello word
hello kafka
<!- 根据自己的需求添加依赖这里仅需要以下两个,修改对应的scala版本,以及所想用的flink版本->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.2</version>
</dependency>
import org.apache.flink.streaming.api.scala._
object FlinkTest {
def main(args: Array[String]): Unit = {
//创建flink环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(4)
//创建流
val stream: DataStream[String] = env.fromCollection(List(
"hello java",
"hello flink",
"hello java",
"hello flink"
))
stream.print()
env.execute("demo")
}
}
2> hello flink
4> hello flink
3> hello java
1> hello java
env.readTextFile("路径/文件名.格式")
import org.apache.flink.streaming.api.scala._
//字符串转对象
case class WaterSensor(id:String,ts:Long,vc:Double)
object SourceFile {
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(1)
//读取本地文件数据
val fileDS: DataStream[String] = env.readTextFile("in/StringToClass.txt")
//直接输出至控制台
fileWater.print()
//执行
env.execute("textFile")
}
}
env.socketTextStream("192.168.**.**",端口号)
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object SourcePort {
def main(args: Array[String]): Unit = {
//创建流处理环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//指定端口地址与端口号
val inputDataStream: DataStream[String] = env.socketTextStream("192.168.**.**",7777)
//对获取的数据进行WorldCount
val resultDataSet: DataStream[(String, Int)] = inputDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(0)
.sum(1)
//输出结果
resultDataSet.print()
//执行
env.execute("port")
}
}
启动程序后,在可以在本机或者Linux上启用端口,命令:nc -lk [端口号]
//prop是配置连接Kafka的信息
env.addSource(new FlinkKafkaConsumer[数据类型]([Topic名],new SimpleStringSchema(),prop))
读取kafka需要添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.2</version>
</dependency>
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
object SourceKafka {
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//创建Properties对象,设置连接kafka信息
val prop = new Properties()
//Kafka连接地址与端口
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.**.**:9092")
//消费者组名称
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"Kafka_To_Flink")
//KEY的数据类型
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
//VALLUE的数据类型
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
//配置读取策略
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
//读取Kafka中名为‘sensor’的Toipc中的数据
val KafkaToStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), prop))
//直接输出至控制台
KafkaToStream.print()
//执行
env.execute("kafka")
}
}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.util.Random
//自定义类型
case class WaterSensor(id:String,ts:Long,vc:Double)
//继承SourceFunction类,指定输出的数据类型
class MySensorSource extends SourceFunction[WaterSensor]{
//重写run方法
override def run(sourceContext: SourceFunction.SourceContext[Source.WaterSensor]): Unit = {
//具体实现
...
}
override def cancel(): Unit = ???
}
object SourceMy{
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//使用自定义数据源
val mydefDStream: DataStream[Source.WaterSensor] = env.addSource(new MySensorSource)
//直接输出
mydefDStream.print()
//执行
env.execute("abc")
}
}
添加MySQL依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
//自定义类型
case class table_flink_to_mysql(id:String,cnt:Int)
//实现自定义Source
class MysqlSource(Database:String,Table:String) extends SourceFunction[table_flink_to_mysql]{
//配置连接MySQL信息
private val driver="com.mysql.jdbc.Driver"
private val url="jdbc:mysql://192.168.**.**:3306/"+Database
private val username="root"
private val password="root123"
private var connerction: Connection = null
//获取connerction对象
def getConnerction():Unit={
Class.forName(driver)
connerction = DriverManager.getConnection(url,username,password)
}
//查询语句
val selectTable="select * from "+Table
//重写run方法
override def run(sourceContext: SourceFunction.SourceContext[table_aaa]): Unit = {
getConnerction
var rs:ResultSet=connerction.createStatement().executeQuery(selectTable)
while(rs.next()){
//读取Table中的字段
var id =rs.getString("id")
var cnt=rs.getInt("cnt")
//以包装成table_flink_to_mysql类型输出
sourceContext.collect(table_flink_to_mysql(id,cnt))
}
}
override def cancel(): Unit = ???
}
object SourceMySQL{
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//使用自定义数据源,传入参数,Database、Table
val mydefDStream: DataStream[table_aaa] = env.addSource(new MysqlSource("test","flink_to_mysql"))
//输出拉取到的数据
mydefDStream.print()
//执行
env.execute("readMySQL")
}
}
映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
import org.apache.flink.streaming.api.scala._
object Transfrom_map {
def main(args: Array[String]): Unit = {
//1.创建执行的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.从集合中读取数据
val sensorDS: DataStream[WaterSensor] = env.fromCollection(
// List(1,2,3,4,5)
List(
WaterSensor("ws_001", 1577844001, 45.0),
WaterSensor("ws_002", 1577844015, 43.0),
WaterSensor("ws_003", 1577844020, 42.0)
)
)
val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1))
//3.打印
sensorDSMap.print()
//4.执行
env.execute("sensor")
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
自定义MapFunction
//sensor-data.log 文件数据
sensor_1,1549044122,10
sensor_1,1549044123,20
sensor_1,1549044124,30
sensor_2,1549044125,40
sensor_1,1549044126,50
sensor_2,1549044127,60
sensor_1,1549044128,70
sensor_3,1549044129,80
sensor_3,1549044130,90
sensor_3,1549044130,100
import org.apache.flink.streaming.api.scala._
object SourceFileMap {
def main(args: Array[String]): Unit = {
//1.创建执行的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.从指定路径获取数据
val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
val MapDS = fileDS.map(
lines => {
//更加逗号切割 获取每个元素
val datas: Array[String] = lines.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
//使用继承了MapFunction的类
//fileDS.MyMapFunction
//3.打印
MapDS.print()
//4.执行
env.execute("map")
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
/**
* 自定义继承 MapFunction
* MapFunction[T,O]
* 自定义输入和输出
*
*/
class MyMapFunction extends MapFunction[String,WaterSensor]{
override def map(t: String): WaterSensor = {
val datas: Array[String] = t.split(",")
WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)
}
}
}
所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object Transform_RichMapFunction {
def main(args: Array[String]): Unit = {
//1.创建执行的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.从指定路径获取数据
val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction)
//3.打印
myMapDS.print()
//4.执行
env.execute("map")
}
/**
* 自定义继承 RicMapFunction
* RicMapFunction[T,O]
* 自定义输入和输出
*
*/
class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{
override def map(value: String): WaterSensor = {
val datas: Array[String] = value.split(",")
// WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt)
}
// 富函数提供了生命周期方法
override def open(parameters: Configuration): Unit = {}
override def close(): Unit = {}
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
Rich Function有一个生命周期的概念。典型的生命周期方法有:
import org.apache.flink.streaming.api.scala._
object Transform_FlatMap {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val listDS: DataStream[List[Int]] = env.fromCollection(
List(
List(1, 2, 3, 4),
List(5, 6, 7,1,1,1)
)
)
val resultDS: DataStream[Int] = listDS.flatMap(list => list)
resultDS.print()
// 4. 执行
env.execute()
}
}
import org.apache.flink.streaming.api.scala._
object Transform_Filter {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val listDS: DataStream[List[Int]] = env.fromCollection(
List(
List(1, 2, 3, 4,1, 2, 3, 4),
List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4),
List(1, 2, 3, 4),
List(5, 6, 7,1,1,1),
List(1, 2, 3, 4),
List(5, 6, 7,1,1,1)
)
)
// true就留下,false就抛弃
listDS.filter(num => {
num.size>5
})
.print("filter")
// 4. 执行
env.execute()
}
}
import org.apache.flink.streaming.api.scala._
object Transform_KeyBy {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
//3.转换为样例类
val mapDS = sensorDS.map(
lines => {
val datas = lines.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
// 4. 使用keyby进行分组
// TODO 关于返回的key的类型:
// 1. 如果是位置索引 或 字段名称 ,程序无法推断出key的类型,所以给一个java的Tuple类型
// 2. 如果是匿名函数 或 函数类 的方式,可以推断出key的类型,比较推荐使用
// *** 分组的概念:分组只是逻辑上进行分组,打上了记号(标签),跟并行度没有绝对的关系
// 同一个分组的数据在一起(不离不弃)
// 同一个分区里可以有多个不同的组
// val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0)
// val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id")
val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
// val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(
// new KeySelector[WaterSensor, String] {
// override def getKey(value: WaterSensor): String = {
// value.id
// }
// }
// )
sensorKS.print().setParallelism(5)
// 4. 执行
env.execute()
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
import org.apache.flink.streaming.api.scala._
object Transform_Shuffle {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
val shuffleDS = sensorDS.shuffle
sensorDS.print("data")
shuffleDS.print("shuffle")
// 4. 执行
env.execute()
}
}
import org.apache.flink.streaming.api.scala._
object Transform_Split {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
// 3.转换成样例类
val mapDS: DataStream[WaterSensor] = sensorDS.map(
lines => {
val datas: Array[String] = lines.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
val splitDS: SplitStream[WaterSensor] = mapDS.split(
sensor => {
if (sensor.vc < 40) {
Seq("info")
} else if (sensor.vc < 80) {
Seq("warn")
} else {
Seq("error")
}
}
)
val errorDS: DataStream[WaterSensor] = splitDS.select("error")
val warnDS: DataStream[WaterSensor] = splitDS.select("warn")
val infoDS: DataStream[WaterSensor] = splitDS.select("info")
infoDS.print("info")
warnDS.print("warn")
errorDS.print("error")
// 4. 执行
env.execute()
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
import org.apache.flink.streaming.api.scala._
object Transform_Connect {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
// 3.转换成样例类
val mapDS: DataStream[WaterSensor] = sensorDS.map(
lines => {
val datas: Array[String] = lines.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
// 4. 从集合中再读取一条流
val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))
val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS)
// coMap表示连接流调用的map,各自都需要一个 function
resultCS.map(
sensor=>sensor.id,
num=>num+1
).print()
// 4. 执行
env.execute()
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
connect与 union 区别:
import org.apache.flink.streaming.api.scala._
object Transform_Union {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2. 从集合中读取流
val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4))
val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10))
val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110))
// TODO union 真正将多条流合并成一条流
// 合并的流,类型必须一致
// 可以合并多条流,只要类型一致
num1DS.union(num2DS).union(num3DS)
.print()
// 4. 执行
env.execute()
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
Flink作为计算框架,主要应用于数据计算处理上, 所以在keyBy对数据进行分流后,可以对数据进行相应的统计分析
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object Transform_Process {
def main(args: Array[String]): Unit = {
// 1.创建执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.读取数据
val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
// 3.转换成样例类
val mapDS: DataStream[WaterSensor] = sensorDS.map(
lines => {
val datas: Array[String] = lines.split(",")
WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
}
)
//按照ID 进行分组
val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
sensorKS.process(new MyKeyedProcessFunction)
// 4. 执行
env.execute()
}
// 自定义KeyedProcessFunction,是一个特殊的富函数
// 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型
// 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑
class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] {
/**
* 处理逻辑:来一条处理一条
*
* @param value 一条数据
* @param ctx 上下文对象
* @param out 采集器:收集数据,并输出
*/
override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value)
// 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple
// ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple
}
}
/**
* 定义样例类:水位传感器:用于接收空高数据
*
* @param id 传感器编号
* @param ts 时间戳
* @param vc 空高
*/
case class WaterSensor(id: String, ts: Long, vc: Double)
}
dataStream.writeAsCsv("路径/文件名")
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object SinkCSV {
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//读取本地文件数据
val dataStream: DataStream[String] = env.readTextFile("in/a.txt")
//以csv格式写入本地文件
dataStream.writeAsCsv("in/b.txt")
//执行
env.execute()
}
//KafkaToStream是DataStream
KafkaToStream.addSink(new FlinkKafkaProducer[数据类型]("192.168.**.**:9092","Topic",new SimpleStringSchema()))
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.kafka.clients.consumer.ConsumerConfig
object SinkKafka {
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//读取本地文件数据
val dataStream: DataStream[String] = env.readTextFile("in/a.txt")
//将获取的数据传到kafka
KafkaToStream.addSink(new FlinkKafkaProducer[String]("192.168.**.**:9092","senserout",new SimpleStringSchema()))
//执行
env.execute("kafkademo2")
}
}
//自定义Sink,继承RichSinkFunction类
class MyRichSinkToMySQL extends RichSinkFunction[student] {
override def open(parameters: Configuration): Unit = {
//建立连接
...
}
override def invoke(value: student): Unit = {
//实现
...
}
override def close(): Unit = {
//关闭连接
...
}
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._
//自定义类型Student
case class student(id:Int,name:String)
object SinkMySQL {
def main(args: Array[String]): Unit = {
//创建流执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//读取本地文件数据
val dataStream: DataStream[String] = env.readTextFile("in/a.txt")
//转换成自定义类型
val students: DataStream[student] = dataStream.map(x => {
val strings: Array[String] = x.split(",")
student(strings(0).toInt, strings(1))
})
//指定Sink,输出至MySQL
students.addSink(new MyRichSinkToMySQL)
//执行
env.execute()
}
}
//自定义Sink输出至MySQL
class MyRichSinkToMySQL extends RichSinkFunction[student] {
var conn: Connection =_
var insertTmp: PreparedStatement =_
var updateTmp: PreparedStatement =_
override def open(parameters: Configuration): Unit = {
//建立连接
//配置MySQL地址,用户名,密码
conn=DriverManager.getConnection("jdbc:mysql://192.168.**.**:3306/test","root","root123")
//预写insert语句
insertTmp=conn.prepareStatement("insert into aaa (id,name) values (?,?)")
//预写uodate语句
updateTmp=conn.prepareStatement("update aaa set name = ? where id = ? ")
}
override def invoke(value: student): Unit = {
//执行更新语句
updateTmp.setString(1,value.name)
updateTmp.setInt(2,value.id)
//执行
updateTmp.execute()
//判断更新条数,如果为0,说明没有初始值,改为执行插入语句
if (updateTmp.getUpdateCount == 0 ){
insertTmp.setInt(1,value.id)
insertTmp.setString(2,value.name)
insertTmp.execute()
}
}
override def close(): Unit = {
//关闭连接
insertTmp.close()
updateTmp.close()
conn.close()
}
}
时间窗口(Time Window)
计数窗口(Count Window)
//方式一
.window(TumblingEventTimeWindows.of(Time.seconds(15)))
//方式二
.timeWindow(Time.seconds(15))
//窗口长度15秒,滑动步长5秒
//方式一
.window( SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(3)))
//方式二
.timeWindow(Time.seconds(15),Time.seconds(5))
//会话时长10分钟
.window(EventTimeSessionWindows.withGap(Time.minutes(10))
//计数10条
.countWindow(10)
//计数10条,滑动2条
.countWindow(10,2)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//EventTime 事件发生事件
//IngestionTime 事件进入Flink事件
//ProcessingTime 事件处理事件
//给env创建的每一个Stream追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
在 Flink的流式处理中, 绝大部分的业务都会使用 eventTime一般只在eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime
import Source.WaterSensor
import org.apache.flink.api.common.state._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
//定义带时间戳字段的样例类
case class WaterSensor(id:String,ts:Long,vc:Double)
object FlinkEventTime_wt {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//为env创建的每一个Stream追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//从端口读取数据
val dataStream: DataStream[String] = env.socketTextStream("192.168.**.**",7777)
//将读取的数据转换成WaterSensor类型
dataStream.map(x=>{
val strings: Array[String] = x.split(",")
WaterSensor(strings(0),strings(1).toLong,strings(2).toDouble)
})
//方式一(推荐):数据密集时,可以使用周期性生成watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WaterSensor](Time.seconds(3)) {//设置乱序延迟为3秒
//将处理过的数据提取出时间戳,乘以1000转成以毫秒的形式计数
override def extractTimestamp(t: WaterSensor): Long = t.ts*1000L
})
//方式二:分配升序时间戳,传入毫秒时间戳
//.assignAscendingTimestamps(_.ts*1000L)
//设置窗口为10秒并以及3秒的长度滑动
val dataStream3: WindowedStream[(String, Double), String, TimeWindow] = dataStream2.map(x=>(x.id,x.vc)).keyBy(x=>x._1).timeWindow(Time.seconds(10),Time.seconds(3))
//对window中的数据进行的处理
val dataStream4: DataStream[(String, Double)] = dataStream3.reduce((x,y)=>(x._1,x._2+y._2))
//输出window中的数据
dataStream4.print("ev_wate_win")
//启动
env.execute("watermark")
}
}
//设置窗口最终销毁时间,延迟一分钟
.allowedLateness(Time.minutes(1))
//设置侧输出流,处理窗口销毁后得迟到数据
.sideOutputLateData(new OutputTag[ApacheLogEvent]("late"))
示例
dataStream.filter(_.method=="GET").keyBy(_.url)
//设置窗口
.timeWindow(Time.minutes(10),Time.seconds(5))
//设置窗口最终销毁时间,延迟一分钟,处理迟到数据
.allowedLateness(Time.minutes(1))
//设置侧输出流,处理窗口销毁后得迟到数据
.sideOutputLateData(new OutputTag[ApacheLogEvent]("late"))
.aggregate(new PageCountAgg(),new PageViewCountWindowResult())
列表状态(List state)
联合列表状态(Union list state)
广播状态(Broadcast state)
值状态(Value state)
列表状态(Liststate)
映射状态(Map state)
聚合状态(Reducing state & Aggregating State)
键控状态的类型有很多种例如ValueState、MapState、ListState等等
//几种不同类型的定义格式
//ValueState
var 键控名:ValueState[数据类型]=getRuntimeContext.getState(new ValueStateDescriptor[数据类型]("自定义名称",classOf[数据类型]))
//ListState
val 键控名: ListState[数据类型] = getRuntimeContext.getListState(new ListStateDescriptor[数据类型]("自定义名称",classOf[数据类型]))
//MapState(map与其他状态类型稍有不同,就是需要两个classOf)
val 键控名: MapState[key的数据类型, value的数据类型] = getRuntimeContext.getMapState(new MapStateDescriptor[key的数据类型,value的数据类型]("自定义名称",classOf[key的数据类型],classOf[value的数据类型]))
ValueStae类型示例
//定义键控状态
var test:ValueState[String]=getRuntimeContext.getState(new ValueStateDescriptor[String]("testabc",classOf[String]))
//读取状态值
val testabc:String=test.value()
//更新状态值
test.update("zs")
MemoryStateBackend
public MemoryStateBackend() {this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);}
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//配置使用MemoryStateBackend
env.setStateBackend(new MemoryStateBackend)
FsStateBackend
public FsStateBackend(String checkpointDataUri) {this(new Path(checkpointDataUri));}
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//配置使用FsStateBackend
env.setStateBackend(new FsStateBackend("存储路径"))
RocksDBStateBackend
RocksDBStateBackend比较特殊,如果需要使用,需要添加依赖:
根据自己的使用的scala和flink版本进行修改
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.1</version>
</dependency>
public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
this((new Path(checkpointDataUri)).toUri(), enableIncrementalCheckpointing);
}
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//RocksDBStateBackend除了配置存储路径,还需要配置是否增量存储,否则就是全量存储
env.setStateBackend(new RocksDBStateBackend("存储路径",true))
AT-MOST-ONCE (最多一次):当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重
播丢失的数据。At-most-once 语义的含义是最多处理一次事件。
AT-LEAST-ONCE (至少一次):在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障称为at-
least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。
EXACTLY-ONCE (精确一次):恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅
意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。
原理图一
原理图二
原理图三
原理图四
原理图五
使用两阶段提交需要注意一个问题,就是如果kafka的时候等待超过15分钟没有被提交就会自动丢弃数据,如果Checkpoint操作超过15分钟就会发生这种情况,所有在做配置的时候Checkpoint超时时间尽量小一些
针对不同得流F1ink提供了8个Process Function
processElement(v: IN, ctx: Context, out: Collector[OUT]),流中的每一个元素都会调用这个方法,调用结果将会放在Collector 数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(sideoutputs)。
onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
使用示例
/**********************************简单使用proceess实现测输出流的功能*********************************************/
import Source.WaterSensor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//获取端口传入的数据
val Stream: DataStream[String] = env.socketTextStream("192.168.**.**", 7777)
//使用Process,创建SlipeProcessFunction类,并传入默认值
val data: DataStream[Int] = Stream.process(new SlipeTmpProcess(30.0))
//主流输出hige
data.print("hige")
//测输出流输出low
data.getSideOutput(new OutputTag[String]("low")).print("low")
env.execute()
}
}
//实现SlipeTmpProcess 继承ProcessFunction,设置传入数据类型为Int,输出数据类型为String
class SlipeTmpProcess(d: Double) extends ProcessFunction[Int,String]{
override def processElement(value: Int, ctx: ProcessFunction[Int, String]#Context, out: Collector[String]): Unit = {
//当接收的数据大于默认值30时属于高的
if (value.vc>d){
out.collect("当前数值高于默认值")
}else{
//不满足条件的数据输入到测输出流
//调用output方法
ctx.output(new OutputTag[WaterSensor]("low"),"当前数值低于默认值")
}
}
}
这里我们重点介绍KeyedProcessFunction.
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext0等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:
代码示例
//**************************这个示例模拟接收一个温度传感器得数据进行监控温度数据并作出反应*****************************
import Source.WaterSensor
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
//定义样例类
case class WaterSensor(id:String,ts:Long,vc:Double)
object ProcessTest {
def main(args: Array[String]): Unit = {
//创建流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//使用事件发生时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置并行度为1
env.setParallelism(1)
//设置一个端口作为数据源
val Stream: DataStream[String] = env.socketTextStream("192.168.**.**",7777)
//将获取得数据转换成watersensor类型
val dataStream: DataStream[WaterSensor] = Stream.map(data => {
val strings: Array[String] = data.split(",")
WaterSensor(strings(0), strings(1).toLong, strings(2).toDouble)
})
//先分区在process,自定义实现KeyedProcessFunction
val value: DataStream[String] = dataStream.keyBy(_.id).process(new KPF)
//输出数据
value.print("data")
env.execute()
}
}
//实现自定义KeyeedProcessFunction
class KPF extends KeyedProcessFunction[String,WaterSensor,String]{
//定义waterState状态,来保存当前接收到的WayerSensor
var waterState:ValueState[WaterSensor]=_
//定义currentState保存创建的定时器的时间值
var currentState:ValueState[Long]=_
//设置生命周期
override def open(parameters: Configuration): Unit = {
//初始化
waterState=getRuntimeContext.getState(new ValueStateDescriptor[WaterSensor]("watersensor",classOf[WaterSensor]))
currentState=getRuntimeContext.getState(new ValueStateDescriptor[Long]("current",classOf[Long]))
}
override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
//如果当前是第一次接收到数据,就直接传入watersensor状态值
if(waterState.value()==null){
println("初始化watersoner")
waterState.update(value)
}else{//否则判度当前获取得数据是否比上一次得温度值高
//当传感器温度上升时,创建一个10秒后执行的定时器
if(value.vc>waterSensor.value().vc && currentState.value()==0){
//获取当前数据处理时间并加上10000毫秒作为定时器执行时间
val timeTS:Long=ctx.timerService().currentProcessingTime()+10000L
//创建定时器
ctx.timerService().registerProcessingTimeTimer(timeTS)
println("已创建定时器")
//保存定时器执行时间
currentState.update(timeTS)
//当在10秒内读取到温度下降的值,解除定时器
}else if(value.vc<waterState.value().vc){
//根据定时器的时间,删除执行对应定时器
ctx.timerService().deleteProcessingTimeTimer(currentState.value())
//清空初始化
currentState.clear()
}
//将每次传入的数据保存下来
println("正在更新waterSensor")
waterState.update(value)
}
}
//实现opTime
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext, out: Collector[String]): Unit = {
//当定时器发生时返回输出内容
out.collect("警告!"+"传感器ID:"+waterState.value().id+"\t检测到温度持续上升!当前温度"+waterState.value().vc)
currentState.clear()
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。