赞
踩
主要包括有界流和无界流
有界流包括env.fromCollection,env,readTextFile
无界流包括env.socketTextStream("127.0.0.1", 1111),env.addSource(new FlinkKafkaConsumer010[String]("topic1", new SimpleStringSchema(), properties))
- import java.util.Properties
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.functions.source.SourceFunction
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
-
- import scala.collection.immutable
- import scala.util.Random
-
-
- case class SensorReading(id: String, timestamp: Long, temperature: Double)
-
-
- object FlinkSourceExe {
- def main(args: Array[String]): Unit = {
- val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- executionEnvironment.setParallelism(1)
- val dataList = List(SensorReading("sensor_1", 1558723892, 35.5), SensorReading("sensor_2", 1558709800, 35.6))
- // 1、有界流 env.fromCollectioin
- val stream1: DataStream[SensorReading] = executionEnvironment.fromCollection(dataList)
- // 2、有界流 env.readTextFile
- val stream2: DataStream[String] = executionEnvironment.readTextFile("src/main/resources/sensorReading.txt")
-
- //3、无界流 kafka
- val properties = new Properties()
- properties.setProperty("bootstrap.servers", "localhost:9092")
- properties.setProperty("group.id", "consumer-group")
- //输出和kafka的并行度一致
- val stream3: DataStream[String] = executionEnvironment.addSource(new FlinkKafkaConsumer010[String]("topic1", new SimpleStringSchema(), properties))
-
- //4、无界流 读取socket
- val stream4 = executionEnvironment.socketTextStream("127.0.0.1", 1111)
-
-
- //5、自定义Source
- val stream5 = executionEnvironment.addSource(new MySensorSource())
-
- stream5.print()
-
-
- executionEnvironment.execute()
- }
- }
-
- class MySensorSource() extends SourceFunction[SensorReading] {
-
- var running: Boolean = true
-
- override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
- val random = Random
- val curtmp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(i => {
- ("sensor" + i, random.nextDouble() * 100)
- })
-
- while (running) {
- val readings: immutable.IndexedSeq[SensorReading] = curtmp.map(data => {
- SensorReading(data._1, System.currentTimeMillis(), data._2 + random.nextGaussian())
- })
- readings.foreach(data => {
- sourceContext.collect(data)
- Thread.sleep(2000)
- })
-
- }
- }
-
-
- override def cancel(): Unit = {
- running = false
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。