当前位置:   article > 正文

Flink之Souce

souce

1.1 数据源来源于文件

env.readTextFile("").print()
  • 1

1.2 数据源来源于Kafka

1.2.1 创建topic并查看是否创建成功
[root@master ~]# kafka-topics.sh --create --topic raytek --partitions 3 --replication-factor 3 --zookeeper master:2181
Created topic "raytek".
[root@master ~]# kafka-topics.sh --list --zookeeper master:2181
__consumer_offsets
gamelog
myItems_topic5
raytek
test

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
1.2.2 编写kafka消费者代码并运行
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object DataFromKafKa {
  def main(args: Array[String]): Unit = {

    //环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //配置kafka相关参数
    val props = new Properties()
    props.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092")
    props.setProperty("zookeeper.connect", "master:2181,slave1:2181,slave2:2181")
    props.setProperty("group.id", "group01")
    props.setProperty("auto.offset.reset", "latest")

    //定义kafka源
    env.addSource(new FlinkKafkaConsumer[String]("raytek", new SimpleStringSchema(), props))
      .print()

    //触发启动
    env.execute(this.getClass.getSimpleName)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

编写完成后运行等待生产者生产数据

1.2.3 创建生产者并添加数据
[root@master config]# kafka-console-producer.sh --topic raytek --broker-list master:9092
>raytek_3,36.389,john,1582641123,北京西站-地铁站

  • 1
  • 2
  • 3
1.2.4 查看消费者是否正常消费

在这里插入图片描述

1.3 自定义数据源(温度大于37输出)

1.3.1 准备数据文件raytek.log
raytek_1,36.3,jack,1582641121,北京西站-北广场
raytek_3,36.389,john,1582641123,北京西站-地铁站
raytek_9,37.3,tom,1582641124,北京西站-公交车站
raytek_2,35.4,leon,1582641127,北京西站北广场3号停车场
raytek_3,36.4,kate,1582641128,北京西站-东5号停车场
raytek_3,36.389,alice,1582641129,北京西站南广场-公交站
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
1.3.2 自定义数据源
import com.yuanhanhan.entity.Raytek
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.io.Source

class MySource extends SourceFunction[Raytek]{

  //计数
  var cnt = 0;

  //控制运行标记变量
  var isRunning = true;

  //自定义source逻辑 体温大于37度的输出
  override def run(sourceContext: SourceFunction.SourceContext[Raytek]): Unit = {

    //读取文件
    val list = Source.fromFile("input/raytek.log").getLines().toList

    //过滤数据
    while(cnt < list.size && isRunning){
      val perEle = list(cnt)
      val arr = perEle.split(",")
      val id = arr(0).trim
      val temperature = arr(1).trim.toDouble
      val name = arr(2).trim
      val timestamp = arr(3).trim.toLong
      val location = arr(4).trim
      val instance = Raytek(id, temperature, name, timestamp, location)
      if(temperature > 37){
        sourceContext.collect(instance)
      }
      cnt += 1
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
1.3.4 入口类
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object ReadDataFromMySource {
  def main(args: Array[String]): Unit = {
    //环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    import org.apache.flink.api.scala._

    //读取自定义数据源
    env.addSource(new MySource).print()

    //启动
    env.execute(this.getClass.getSimpleName)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
1.3.5 查看结果

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/794432
推荐阅读
相关标签
  

闽ICP备14008679号