赞
踩
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment
会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
创建一个批处理
val env = ExecutionEnvironment.getExecutionEnvironment
创建一个流处理
val env = StreamExecutionEnvironment.getExecutionEnvironment
如果没有设置并行度,会以flink-conf.yaml
中的配置为准,默认是 1。
并行度的优先级是
每个函数 > 整个程序 > 配置文件
package Source import org.apache.flink.streaming.api.scala._ //定义样例类 case class SensorReading(id: String, timeStamp: Long, temperature: Double) object SourceFromCollection { def main(args: Array[String]): Unit = { //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //从集合中读取数据 val dataList = (List( SensorReading("sensor_1", 1547718199, 35.8), SensorReading("sensor_6", 1547718201, 15.4), SensorReading("sensor_7", 1547718202, 6.7), SensorReading("sensor_10", 1547718205, 38.1) )) //从集合中读取数据 val stream1 = env.fromCollection(dataList) stream1.print() env.execute() } }
我们可以发现我们输出的数据和输入数据的顺序是不一样的,这是因为我们没有设置并行度,那么他就会按照CPU
的个数来设置并行度,由于是并行的那么输出的顺序就是不一样的
在读取数据的时候有一个特殊的方法fromElements
,他读取的数据类不限,你给他说明他就输出说明
val stream1 = env.fromElements(dataList)
val stream2 = env.fromElements(1.0,"aa",1)
stream1.print()
stream2.print()
先创建文件数据
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
package Source import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object SourceFromFile { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //读取文件 val filePath = "src/main/resources/SensorReading" val inputStream = env.readTextFile(filePath) inputStream.print() env.execute() } }
在与Kafka
连接时,需要先导入一个依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
为什么程序中是消费者?
Kafka
的生产者中输出数据,在通过Kafka
的消费者里读取数据package Source import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.api.scala._ import java.util.Properties object SourceFromKafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //配置项 val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //这边是要读取Kafka的数据,所以算是一个消费者 /** * 第一个泛型:读取过来的数据类型 * 第一个参数:Kafka名 * 第二个参数:序列化 * 第三个参数:配置项 */ val stream = env.addSource(new FlinkKafkaConsumer011[String] ("first", new SimpleStringSchema(), properties)) stream.print() env.execute() } }
FlinkKafkaConsumer011
的参数解释
Kafka
得到管道名确保zookeeper
和Kafka
开启
zookeeper
开启命令:zkServer.sh start
kafka
开启命令:bin/kafka-server-start.sh -daemon config/server.properties
创建Kafka
管道
bin/kafka-topics.sh --create --zookeeper 192.168.23.69:2181 --replication-factor 3 --partitions 1 --topic first
bin/kafka-console-producer.sh --broker-list a:9092 --topic first
window
的映射文件,否则会报错结果展示
我首先需要先自定义个Source
,需要继承SourceFunction
类,泛型是输出的数据类型
/** * 自定义Source类 */ class MySensorSource() extends SourceFunction[SensorReading] { //定义一个标志位flag,依赖表示数据源是否正常运行发出数据 var flag = true override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { //定义一个随机数发生器 val rand = new Random() //随机生成十个初始温度:(id:temp) var curTemp = 1.to(10).map(i => ("sensor_" + 1, rand.nextDouble() * 100)) //定义无限循环,不停的产生数据,出发呗cancel while (flag) { //在上次温度的基础上更新温度值 curTemp = curTemp.map( data => (data._1, data._2 + rand.nextGaussian()) ) //获取当前时间戳,将时间戳撞倒sensor,然后使用context发出数据 val curTime = System.currentTimeMillis() curTemp.foreach( data => sourceContext.collect(SensorReading(data._1, curTime, data._2)) ) //.间隔一段时间 Thread.sleep(1000) } } override def cancel(): Unit = flag = false }
使用addSource
方法调用类
package Source import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import scala.util.Random object SourceFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new MySensorSource()) stream.print() env.execute() } }
Map
转换操作:
使用语法
val streamMap = stream.map { x => x * 2 }
如图可见,map
算子是将数据从一种形态转换成另一种形态,一对一的转换
flatMap
算子的使用
flatMap
的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
结果是 List(1,1,2,2,3,3)
, 而 List("a b", "c d").flatMap(line ⇒ line.split(" "))
结果是 List(a, b, c, d)。
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
flatMap
算子个一对多的算子
Filter
算子
val streamFilter = stream.filter{
x => x == 1
}
keyBy
算子
DataStream → KeyedStream
:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key
的元素,在内部以hash
的形式实现的。
最后的结果就是同一个key
必定在同一个分区,但是不同的分区里面不一定是同一个key
,因为分区数有限
滚动聚合算子
KeyedStream
的每一个支流做聚合算子 | 作用 |
---|---|
sum | 求和 |
min | 最小值 |
max | 最大值 |
minBy | 最小值的整条数据 |
maxBy | 最大值的整条数据 |
我们来测试一下minBy
和min
的区别,测试数据如下
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.7
package transform import Source.SensorReading import org.apache.flink.streaming.api.scala._ object transformatTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val inputPath = "src/main/resources/SensorReading" val inputStream = env.readTextFile(inputPath) //转换成样例类类型 val dataStream = inputStream.map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) //分组聚合输出每个传感器温度最小值 val aggStream = dataStream .keyBy("id") //根据ID分组 .min(2) //第二个最小值,也可以写属性名 aggStream.print() env.execute() } }
我们可以看到虽然最小值找了出来,但是时间戳数据确实错的,那我们将min
改为minBy
来查看结果是否一致
这个时候数据就匹配了,说明minBy
返回的是整条数据
reduce
算子
KeyedStream
→ DataStream
:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
计算温度的最小值和时间的最新值
val resultStream = dataStream
.keyBy("id")
.reduce(
(curState, newDate) =>
SensorReading(curState.id, newDate.timeStamp, curState.temperature.min(newDate.temperature))
)
resultStream.print()
在例题中:curState
是之前聚合处理留下来的结果
自定义实现一个Reduce
,需要继承ReduceFunction
类
class MyReduceFunction extends ReduceFunction[SensorReading] {
/**
*
* @param t 是之前累计运算的结果
* @param t1 是新传来等待处理的数据
* @return
*/
override def reduce(t: SensorReading, t1: SensorReading): SensorReading = SensorReading(t.id, t1.timeStamp, t.temperature.min(t1.temperature))
}
分流操作
Spilt
,其实Spilt
在实际上并没有将流分成俩快。而是类似于分组的操作
DataStream
→ SplitStream
:根据某些特征把一个 DataStream 拆分成两个或者=多个 DataStream
Select
SplitStream
→DataStream
:从一个 SplitStream
中获取一个或者多个DataStream
。
需求练习:以三十度为界,将数据拆分成两个流
package transform import Source.SensorReading import org.apache.flink.streaming.api.scala._ object SplitAndSelect { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val inputPath = "src/main/resources/SensorReading" val inputStream = env.readTextFile(inputPath) //转换成样例类类型 val dataStream = inputStream.map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) //分流操作 val spiltStream = dataStream .split( data => { if (data.temperature > 30.0) Seq("High") else Seq("low") } ) //获取流 val HighTempStream = spiltStream.select("High") val LowTempStream = spiltStream.select("low") val allStream = spiltStream.select("High", "low") HighTempStream.print("high") LowTempStream.print("low") allStream.print("all") env.execute() } }
合流操作
DataStream
,DataStream
→ ConnectedStreams
:连接两个保持他们类型的数据流,两个数据流被 Connect
之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。ConnectedStreams
→ DataStream
:作用于 ConnectedStreams
上,功能与 map
和 flatMap
一样,对 ConnectedStreams
中的每一个 Stream
分别进行map
和 flatMap
处理,也就是说,如有两个stream
在内,在使用CoMap
时必须定义两个Map
的用法 //将数据类型转换一下
val warningStream = HighTempStream.map(data => (data.id, data.temperature))
val connectStream = warningStream.connect(LowTempStream)
//CoMap对数据继续处理
val CoMapStream = connectStream
.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy"
)
)
union
算子
DataStream
→ DataStream
:对两个或者两个以上的 DataStream
进行union
操作,产生一个包含所有 DataStream
元素的新 DataStream
Connect
与 Union
区别:
Union
之前两个流的类型必须是一样,Connect
可以不一样,在之后的 coMap
中再去调整成为一样的。
Connect
只能操作两个流,Union
可以操作多个
Flink
流应用程序处理的是以数据对象表示的事件流。所以在Flink
内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink
使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink
还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda
函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink
支持 Java
和Scala
中所有常见数据类型。使用最广泛的类型有以下几种
基础数据类型
Flink
支持所有的 Java
和 Scala
基础数据类型,Int
, Double
, Long
, String
, …val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
Java
和 Scala
元组(Tuples
)
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18
Scala
样例类
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
** Java
简单对象(POJOs
)**
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
其它(Arrays, Lists, Maps, Enums
) 等
Flink
对 Java
和 Scala
中的一些特殊目的的类型也都是支持的,比如 Java
的ArrayList
,HashMap
,Enum
等等。Flink
暴露了所有 udf
函数的接口(实现方式为接口或者抽象类)。例如MapFunction
, FilterFunction
, ProcessFunction
等等
Flink
想要自定义函数,只要使自定义类继承对应的Function
类然后重写方法即可
例如,我们下面自定义Filter
函数,我们只保留以sensor_1
开头的数据
class MyFilterFunction extends FilterFunction[SensorReading] {
override def filter(t: SensorReading): Boolean =
t.id.startsWith("sensor_1")
}
函数的使用
object MyFilterTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val inputPath = "src/main/resources/SensorReading" val inputStream = env.readTextFile(inputPath) //转换成样例类类型 val dataStream = inputStream.map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) val result = dataStream.filter(new MyFilterFunction) result.print() env.execute() } }
结果上显示,我们的筛选使成功的
其实有些时候我们不需要非要创建一个类来实现,我们可以使用匿名类的方式
val result = dataStream.filter(new FilterFunction[SensorReading] {
override def filter(t: SensorReading) = t.id.startsWith("sensor_1")
})
我们 filter
的字符串"sensor_!"还可以当作参数传进去
val flinkTweets = tweets.filter(new KeywordFilter("sensor_1"))
class KeywordFilter(keyWord: SensorReading) extends FilterFunction[SensorReading] {
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(keyWord)
}
}
“富函数”是 DataStream API
提供的一个函数类的接口,所有 Flink
函数类都有其 Rich
版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function
有一个生命周期的概念。典型的生命周期方法有:
open()
方法是rich function
的初始化方法,当一个算子例如map
或者filter
被调用之前open()
会被调用。
close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()
方法提供了函数的 RuntimeContext
的一些信息,例如函数执行的并行度,任务的名字,以及 state
状态
在自定义富函数的可以重写方法
非富函数可重写
Flink
中可以将数据保存在本地文件中
dataStream.writeAsCsv("src/main/resources/out.csv")
不仅仅只有上面的一个还有如下的方法
我们通过图片可以看到,上面的方法大多都是已经过时了的,这里我们提供一个新的方法
dataStream.addSink(StreamingFileSink.forRowFormat(
new Path("src/main/resources/out.csv"),
new SimpleStringEncoder[SensorReading]()
).build())
其中第一个参数就是指定文件系统与其存储的路径,第二个参数就是存储的字符编码
我们可以发现,最后的输出结果并不是一个文件,而是一个文件夹,文件夹内之所以有多个文件是因为并行度的问题
为什么程序里的Kafka是生产者?
Kafka
生产给Kafka
的消费者输出package Sink import Source.SensorReading import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 object SInkFromKafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 env.setParallelism(1) val inputPath = "src/main/resources/SensorReading" val inputStream = env.readTextFile(inputPath) //转换成样例类类型 val dataStream = inputStream.map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString } ) dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "first", new SimpleStringSchema())) env.execute() } }
同样的创建Kafka
的主题:
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic first
创建一个输出到控制台的消费者
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first
我们从一个Kafka中读取数据,将数据处理后在输出到另一个Kafka
package Sink import Source.SensorReading import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import java.util.Properties object SInkToKafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 env.setParallelism(3) // val inputPath = "src/main/resources/SensorReading" // val inputStream = env.readTextFile(inputPath) //从Kafka读取数据 val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //这边是要读取Kafka的数据,所以算是一个消费者 /** * 第一个泛型:读取过来的数据类型 * 第一个参数:Kafka名 * 第二个参数:序列化 * 第三个参数:配置项 */ val stream = env.addSource(new FlinkKafkaConsumer011[String] ("first", new SimpleStringSchema(), properties)) //转换成样例类类型 val dataStream = stream.map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString } ) dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "sensor", new SimpleStringSchema())) env.execute() } }
**我们此时需要两个Kafka
的主题,三者的启动顺序是,先启动生产者Kafka
、再启动消费者Kafka
,最后启动程序
**
导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
在数据库中创建表
CREATE TABLE `temperatures` (
`sensor` varchar(255) DEFAULT NULL,
`temp` double(255,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
package Sink import Source.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import java.sql.{Connection, DriverManager, PreparedStatement} object SinkToMySql { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 env.setParallelism(3) val inputPath = "src/main/resources/SensorReading" val inputStream = env.readTextFile(inputPath) //转换成样例类类型 val dataStream = inputStream.map( data => { val arr = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) dataStream.addSink(new MyJdbcSink()) env.execute() } } class MyJdbcSink() extends RichSinkFunction[SensorReading] { var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ // open 主要是创建连接 override def open(parameters: Configuration): Unit = { super.open(parameters) conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/Flink", "root", "200028") insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES(?, ?)") updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?") } // 调用连接,执行 sql override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() if (updateStmt.getUpdateCount == 0) { insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。