当前位置:   article > 正文

FLink学习笔记:03-Flink DataStream的数据源Source_fink datastreamsource

fink datastreamsource


一个Flink程序主要由Source + TransForm + Sink这三大部分组成,下面主要总结常见的Source的API操作

1.从集合或者Elements中读取数据

package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}


case class SensorReading(id:String, timestamp: Long, temperature: Double)

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //1. 从集合中读取数据
    val dataList = List(
      SensorReading("sensor_1",1547718199,35.8),
      SensorReading("sensor_6",1547718201,15.4),
      SensorReading("sensor_7",1547718199,6.7),
      SensorReading("sensor_10",1547718199,38.1),
    )

    val stream1 = env.fromCollection(dataList)
    stream1.print()
    
    //从Elements中读取数据
    val stream2 = env.fromElements(1.0,35,"hello")
    stream2.print()
    
    env.execute("source test")

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2.从文件中读取数据流

package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011


case class SensorReading(id:String, timestamp: Long, temperature: Double)

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //从文件中读取数据
    val inputPath = "D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\Data\\sensor.txt"
    val txtDs = env.readTextFile(inputPath)
    txtDs.print()
    
    env.execute("source test")

  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

3.从Kafka中读取数据流

  • 在pom.xml中添加kafka的依赖包
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.10.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 连接kafaka的代码如下:
package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //从Kafka中读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.0.52:9092,192.168.0.109:9092,192.168.0.115:9092")
    properties.setProperty("auto.offset.reset","latest")
    properties.setProperty("group.id","consumer-group")
    
    //创建Kafka消费者对象
    val source = {
      new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)
    }
    val stream3 = env.addSource(source).flatMap((_.split(" "))).map((_,1)).keyBy(0).sum(1)
    
    //打印出接收到的Kafka的topic中的word 统计信息
    stream3.print().setParallelism(1)
    
    env.execute("source test")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 然后在kafka那边可以模拟productor发送消息到topic
[root@k8s-node3 bin]# ./kafka-console-producer.sh --broker-list 192.168.0.52:9092,192.168.0.109:9092,192.168.0.115:9092 --topic sensor
>hello world test send message
>hello world test send message
  • 1
  • 2
  • 3

需要注意的是,如果一直报错无法连接到broker,可能是因为flink连接到kafka拿到的broker信息是主机名+port。如k8s-node3:9092,这时需要在windows的hosts文件中添加k8s-node3 和对应的IP地址

4.自定义Source

自定义的source的意义是除了Flink官方常用的一些数据源,我们可以扩展一些特定的数据源作为Flink的数据输入流。

package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}


case class SensorReading(id:String, timestamp: Long, temperature: Double)

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //自定义Source
    val stream4 = env.addSource(new MySensorSource)
    stream4.print()
    env.execute("source test")

  }

}

//自定义Source类
class MySensorSource extends SourceFunction[SensorReading]{

  var running = true

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    var index = 0
    while(running){
      sourceContext.collect(SensorReading("sendor_"+index,System.currentTimeMillis(),35.4))
      index = index +1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    running = false
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/1019344
推荐阅读
相关标签
  

闽ICP备14008679号