当前位置:   article > 正文

【Flink Scala】Flink流处理API_flink readtextfile 废除

flink readtextfile 废除

Environment

在这里插入图片描述

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

创建一个批处理

val env = ExecutionEnvironment.getExecutionEnvironment
  • 1

创建一个流处理

val env = StreamExecutionEnvironment.getExecutionEnvironment
  • 1

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是 1。
在这里插入图片描述
并行度的优先级是

每个函数 > 整个程序 > 配置文件
  • 1

跳转顶部


Source

从集合读取数据

package Source

import org.apache.flink.streaming.api.scala._

//定义样例类
case class SensorReading(id: String, timeStamp: Long, temperature: Double)

object SourceFromCollection {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //从集合中读取数据
    val dataList = (List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10", 1547718205, 38.1)
    ))
    //从集合中读取数据
    val stream1 = env.fromCollection(dataList)
    stream1.print()
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

在这里插入图片描述
我们可以发现我们输出的数据和输入数据的顺序是不一样的,这是因为我们没有设置并行度,那么他就会按照CPU的个数来设置并行度,由于是并行的那么输出的顺序就是不一样的


在读取数据的时候有一个特殊的方法fromElements,他读取的数据类不限,你给他说明他就输出说明

    val stream1 = env.fromElements(dataList)
    val stream2 = env.fromElements(1.0,"aa",1)
    stream1.print()
    stream2.print()
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

跳转顶部


从文件读取数据

先创建文件数据

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1

package Source

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object SourceFromFile {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //读取文件
    val filePath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(filePath)
    
    inputStream.print()
    
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述

跳转顶部


Kafka读取数据

在与Kafka连接时,需要先导入一个依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

为什么程序中是消费者?

  • 因为我们从Kafka的生产者中输出数据,在通过Kafka的消费者里读取数据
package Source

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Properties

object SourceFromKafka {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //配置项
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    //这边是要读取Kafka的数据,所以算是一个消费者
    /**
     * 第一个泛型:读取过来的数据类型
     * 第一个参数:Kafka名
     * 第二个参数:序列化
     * 第三个参数:配置项
     */
    val stream =
      env.addSource(new FlinkKafkaConsumer011[String]
      ("first", new SimpleStringSchema(), properties))

    stream.print()
    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

FlinkKafkaConsumer011的参数解释

  • 泛型是获取到的数据类型
  • 第一个参数是Kafka得到管道名
  • 第二个参数是获取数据类型的序列化
  • 第三个参数是配置项

确保zookeeperKafka开启

  • zookeeper开启命令:zkServer.sh start
  • kafka开启命令:bin/kafka-server-start.sh -daemon config/server.properties

创建Kafka管道

  • 命令如下:bin/kafka-topics.sh --create --zookeeper 192.168.23.69:2181 --replication-factor 3 --partitions 1 --topic first
  • 开生产者:bin/kafka-console-producer.sh --broker-list a:9092 --topic first
    需要修改window的映射文件,否则会报错

在这里插入图片描述

结果展示

在这里插入图片描述

跳转顶部


自定义Source

我首先需要先自定义个Source,需要继承SourceFunction类,泛型是输出的数据类型

/**
 * 自定义Source类
 */
class MySensorSource() extends SourceFunction[SensorReading] {
  //定义一个标志位flag,依赖表示数据源是否正常运行发出数据
  var flag = true

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    //定义一个随机数发生器
    val rand = new Random()

    //随机生成十个初始温度:(id:temp)
    var curTemp = 1.to(10).map(i => ("sensor_" + 1, rand.nextDouble() * 100))

    //定义无限循环,不停的产生数据,出发呗cancel
    while (flag) {
      //在上次温度的基础上更新温度值
      curTemp = curTemp.map(
        data => (data._1, data._2 + rand.nextGaussian())
      )
      //获取当前时间戳,将时间戳撞倒sensor,然后使用context发出数据
      val curTime = System.currentTimeMillis()
      curTemp.foreach(
        data => sourceContext.collect(SensorReading(data._1, curTime, data._2))
      )
      //.间隔一段时间
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = flag = false
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

使用addSource方法调用类

package Source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object SourceFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new MySensorSource())

    stream.print()
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述

跳转顶部


转换算子Transform

简单的转换算子(Map、FlatMAp和Filter)

Map转换操作:
在这里插入图片描述
使用语法

val streamMap = stream.map { x => x * 2 }
  • 1

如图可见,map算子是将数据从一种形态转换成另一种形态,一对一的转换


flatMap算子的使用
在这里插入图片描述

flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]

例如: flatMap(List(1,2,3))(i ⇒ List(i,i))

结果是 List(1,1,2,2,3,3), 而 List("a b", "c d").flatMap(line ⇒ line.split(" "))

结果是 List(a, b, c, d)。

val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
  • 1
  • 2
  • 3

flatMap算子个一对多的算子


Filter算子
在这里插入图片描述

val streamFilter = stream.filter{
x => x == 1
}
  • 1
  • 2
  • 3

跳转顶部


键控流的转换算子(keyBy、滚动聚合和reduce)

keyBy算子
在这里插入图片描述
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key的元素,在内部以hash的形式实现的。

最后的结果就是同一个key必定在同一个分区,但是不同的分区里面不一定是同一个key,因为分区数有限


滚动聚合算子

  • 滚动聚合算子是针对KeyedStream的每一个支流做聚合
算子作用
sum求和
min最小值
max最大值
minBy最小值的整条数据
maxBy最大值的整条数据

我们来测试一下minBymin的区别,测试数据如下

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.7

package transform

import Source.SensorReading
import org.apache.flink.streaming.api.scala._

object transformatTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )
    //分组聚合输出每个传感器温度最小值
    val aggStream = dataStream
      .keyBy("id") //根据ID分组
      .min(2) //第二个最小值,也可以写属性名

    aggStream.print()
    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在这里插入图片描述
我们可以看到虽然最小值找了出来,但是时间戳数据确实错的,那我们将min改为minBy来查看结果是否一致

在这里插入图片描述
这个时候数据就匹配了,说明minBy返回的是整条数据


reduce算子

KeyedStreamDataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

计算温度的最小值和时间的最新值

    val resultStream = dataStream
      .keyBy("id")
      .reduce(
        (curState, newDate) =>
          SensorReading(curState.id, newDate.timeStamp, curState.temperature.min(newDate.temperature))
      )
    resultStream.print()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述
在例题中:curState是之前聚合处理留下来的结果

自定义实现一个Reduce,需要继承ReduceFunction

class MyReduceFunction extends ReduceFunction[SensorReading] {
  /**
   *
   * @param t  是之前累计运算的结果
   * @param t1 是新传来等待处理的数据
   * @return
   */
  override def reduce(t: SensorReading, t1: SensorReading): SensorReading = SensorReading(t.id, t1.timeStamp, t.temperature.min(t1.temperature))
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

跳转顶部


多流的转换算子(Split、select、connect、CoMap和Union)

分流操作

  • Spilt,其实Spilt在实际上并没有将流分成俩快。而是类似于分组的操作

    在这里插入图片描述

  • DataStreamSplitStream:根据某些特征把一个 DataStream 拆分成两个或者=多个 DataStream

  • Select
    在这里插入图片描述

  • SplitStreamDataStream:从一个 SplitStream 中获取一个或者多个DataStream

需求练习:以三十度为界,将数据拆分成两个流

package transform

import Source.SensorReading
import org.apache.flink.streaming.api.scala._

object SplitAndSelect {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )
    //分流操作
    val spiltStream = dataStream
      .split(
        data => {
          if (data.temperature > 30.0) Seq("High") else Seq("low")
        }
      )
    //获取流
    val HighTempStream = spiltStream.select("High")
    val LowTempStream = spiltStream.select("low")
    val allStream = spiltStream.select("High", "low")

    HighTempStream.print("high")
    LowTempStream.print("low")
    allStream.print("all")

    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

在这里插入图片描述


合流操作
在这里插入图片描述

  • DataStream,DataStreamConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
    在这里插入图片描述
  • ConnectedStreamsDataStream:作用于 ConnectedStreams上,功能与 mapflatMap 一样,对 ConnectedStreams 中的每一个 Stream分别进行mapflatMap处理,也就是说,如有两个stream在内,在使用CoMap时必须定义两个Map的用法
    //将数据类型转换一下
    val warningStream = HighTempStream.map(data => (data.id, data.temperature))

    val connectStream = warningStream.connect(LowTempStream)

    //CoMap对数据继续处理
    val CoMapStream = connectStream
      .map(
        warningData => (warningData._1, warningData._2, "warning"),
        lowData => (lowData.id, "healthy"
        )
      )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

union算子
在这里插入图片描述
DataStreamDataStream:对两个或者两个以上的 DataStream 进行union 操作,产生一个包含所有 DataStream元素的新 DataStream

ConnectUnion 区别:

  • Union 之前两个流的类型必须是一样,Connect可以不一样,在之后的 coMap中再去调整成为一样的。

  • Connect只能操作两个流,Union可以操作多个

跳转顶部


支持的数据类型

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

Flink支持 JavaScala 中所有常见数据类型。使用最广泛的类型有以下几种

基础数据类型

  • Flink 支持所有的 JavaScala基础数据类型,Int, Double, Long, String, …
    val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
    numbers.map( n => n + 1 )
    
    • 1
    • 2

JavaScala元组(Tuples

  • 样例
    val persons: DataStream[(String, Integer)] = env.fromElements(
    ("Adam", 17),
    ("Sarah", 23) )
    persons.filter(p => p._2 > 18
    
    • 1
    • 2
    • 3
    • 4

Scala样例类

  • 格式
    	case class Person(name: String, age: Int)
    	val persons: DataStream[Person] = env.fromElements(
    		Person("Adam", 17),
    		Person("Sarah", 23) )
    	persons.filter(p => p.age > 18)
    
    • 1
    • 2
    • 3
    • 4
    • 5

** Java 简单对象(POJOs)**

  • 格式如下
    public class Person {
    	public String name;
    	public int age;
    	public Person() {}
    	public Person(String name, int age) {
    		this.name = name;
    		this.age = age;
    	}
    }
    DataStream<Person> persons = env.fromElements(
    	new Person("Alex", 42),
    	new Person("Wendy", 23));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

其它(Arrays, Lists, Maps, Enums) 等

  • FlinkJavaScala 中的一些特殊目的的类型也都是支持的,比如 JavaArrayListHashMapEnum 等等。

跳转顶部


自定义UDF函数

自定义函数和匿名函数

Flink 暴露了所有 udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction 等等

Flink想要自定义函数,只要使自定义类继承对应的Function类然后重写方法即可

例如,我们下面自定义Filter函数,我们只保留以sensor_1开头的数据

class MyFilterFunction extends FilterFunction[SensorReading] {
  override def filter(t: SensorReading): Boolean =
    t.id.startsWith("sensor_1")
}
  • 1
  • 2
  • 3
  • 4

函数的使用

object MyFilterTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )
    val result = dataStream.filter(new MyFilterFunction)
    result.print()
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在这里插入图片描述
结果上显示,我们的筛选使成功的


其实有些时候我们不需要非要创建一个类来实现,我们可以使用匿名类的方式

    val result = dataStream.filter(new FilterFunction[SensorReading] {
      override def filter(t: SensorReading) = t.id.startsWith("sensor_1")
    })
  • 1
  • 2
  • 3

我们 filter 的字符串"sensor_!"还可以当作参数传进去

val flinkTweets = tweets.filter(new KeywordFilter("sensor_1"))

class KeywordFilter(keyWord: SensorReading) extends FilterFunction[SensorReading] {
	override def filter(value: SensorReading): Boolean = {
		value.id.startsWith(keyWord)
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

跳转顶部


富函数

“富函数”是 DataStream API提供的一个函数类的接口,所有 Flink函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  • open()方法是rich function 的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。

  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

  • getRuntimeContext()方法提供了函数的 RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及 state状态

在自定义富函数的可以重写方法
在这里插入图片描述
非富函数可重写
在这里插入图片描述

跳转顶部


Sink

存储在文件中

Flink中可以将数据保存在本地文件中

    dataStream.writeAsCsv("src/main/resources/out.csv")
  • 1

不仅仅只有上面的一个还有如下的方法
在这里插入图片描述

我们通过图片可以看到,上面的方法大多都是已经过时了的,这里我们提供一个新的方法

    dataStream.addSink(StreamingFileSink.forRowFormat(
      new Path("src/main/resources/out.csv"),
      new SimpleStringEncoder[SensorReading]()
    ).build())
  • 1
  • 2
  • 3
  • 4

其中第一个参数就是指定文件系统与其存储的路径,第二个参数就是存储的字符编码

在这里插入图片描述
我们可以发现,最后的输出结果并不是一个文件,而是一个文件夹,文件夹内之所以有多个文件是因为并行度的问题

使用Kafka作为Sink

为什么程序里的Kafka是生产者?

  • 程序处理好的数据通过代码里的Kafka生产给Kafka的消费者输出
package Sink

import Source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

object SInkFromKafka {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度
    env.setParallelism(1)

    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
      }
    )

    dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "first", new SimpleStringSchema()))
    
    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

同样的创建Kafka的主题:

bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic first
  • 1

创建一个输出到控制台的消费者

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first
  • 1

在这里插入图片描述

跳转顶部


使用Kafka形成一个管道

我们从一个Kafka中读取数据,将数据处理后在输出到另一个Kafka

package Sink

import Source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}

import java.util.Properties

object SInkToKafka {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度
    env.setParallelism(3)

//    val inputPath = "src/main/resources/SensorReading"
//    val inputStream = env.readTextFile(inputPath)

    //从Kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    //这边是要读取Kafka的数据,所以算是一个消费者
    /**
     * 第一个泛型:读取过来的数据类型
     * 第一个参数:Kafka名
     * 第二个参数:序列化
     * 第三个参数:配置项
     */
    val stream =
      env.addSource(new FlinkKafkaConsumer011[String]
      ("first", new SimpleStringSchema(), properties))

    //转换成样例类类型
    val dataStream = stream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
      }
    )

    dataStream.addSink(new FlinkKafkaProducer011[String]("master:9092", "sensor", new SimpleStringSchema()))

    env.execute()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

**我们此时需要两个Kafka的主题,三者的启动顺序是,先启动生产者Kafka、再启动消费者Kafka,最后启动程序在这里插入图片描述
**

JDBC自定义Sink

导入依赖

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.44</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

在数据库中创建表

 CREATE TABLE `temperatures` (
  `sensor` varchar(255) DEFAULT NULL,
  `temp` double(255,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
  • 1
  • 2
  • 3
  • 4
package Sink

import Source.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement}

object SinkToMySql {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度
    env.setParallelism(3)

    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )

    dataStream.addSink(new MyJdbcSink())

    env.execute()
  }
}

class MyJdbcSink() extends RichSinkFunction[SensorReading] {
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  // open 主要是创建连接
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/Flink",
      "root", "200028")
    insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES(?, ?)")
    updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
  }

  // 调用连接,执行 sql
  override def invoke(value: SensorReading, context:
  SinkFunction.Context[_]): Unit = {
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    if (updateStmt.getUpdateCount == 0) {
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

结果
在这里插入图片描述

跳转顶部


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/972401
推荐阅读
相关标签
  

闽ICP备14008679号