当前位置:   article > 正文

flink的souce方法_flink solace

flink solace

主要包括有界流和无界流

有界流包括env.fromCollection,env,readTextFile

无界流包括env.socketTextStream("127.0.0.1", 1111),env.addSource(new FlinkKafkaConsumer010[String]("topic1", new SimpleStringSchema(), properties))

  1. import java.util.Properties
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.streaming.api.functions.source.SourceFunction
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
  6. import scala.collection.immutable
  7. import scala.util.Random
  8. case class SensorReading(id: String, timestamp: Long, temperature: Double)
  9. object FlinkSourceExe {
  10. def main(args: Array[String]): Unit = {
  11. val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  12. executionEnvironment.setParallelism(1)
  13. val dataList = List(SensorReading("sensor_1", 1558723892, 35.5), SensorReading("sensor_2", 1558709800, 35.6))
  14. // 1、有界流 env.fromCollectioin
  15. val stream1: DataStream[SensorReading] = executionEnvironment.fromCollection(dataList)
  16. // 2、有界流 env.readTextFile
  17. val stream2: DataStream[String] = executionEnvironment.readTextFile("src/main/resources/sensorReading.txt")
  18. //3、无界流 kafka
  19. val properties = new Properties()
  20. properties.setProperty("bootstrap.servers", "localhost:9092")
  21. properties.setProperty("group.id", "consumer-group")
  22. //输出和kafka的并行度一致
  23. val stream3: DataStream[String] = executionEnvironment.addSource(new FlinkKafkaConsumer010[String]("topic1", new SimpleStringSchema(), properties))
  24. //4、无界流 读取socket
  25. val stream4 = executionEnvironment.socketTextStream("127.0.0.1", 1111)
  26. //5、自定义Source
  27. val stream5 = executionEnvironment.addSource(new MySensorSource())
  28. stream5.print()
  29. executionEnvironment.execute()
  30. }
  31. }
  32. class MySensorSource() extends SourceFunction[SensorReading] {
  33. var running: Boolean = true
  34. override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
  35. val random = Random
  36. val curtmp: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(i => {
  37. ("sensor" + i, random.nextDouble() * 100)
  38. })
  39. while (running) {
  40. val readings: immutable.IndexedSeq[SensorReading] = curtmp.map(data => {
  41. SensorReading(data._1, System.currentTimeMillis(), data._2 + random.nextGaussian())
  42. })
  43. readings.foreach(data => {
  44. sourceContext.collect(data)
  45. Thread.sleep(2000)
  46. })
  47. }
  48. }
  49. override def cancel(): Unit = {
  50. running = false
  51. }
  52. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/794445
推荐阅读
相关标签
  

闽ICP备14008679号