当前位置:   article > 正文

Flink API之Source入门_org.apache.flink.api.connector.source.source

org.apache.flink.api.connector.source.source

从集合构建

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Sensor extends App {
//创建环境
  private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  //导入隐式转换
  import org.apache.flink.api.scala._
  //从集合构建DataStream
  private val stream: DataStream[SensorReading] = env.fromCollection(List(
    SensorReading("sensor1", 1547718199, 38.222),
    SensorReading("sensor2", 1547718199, 40.222),
    SensorReading("sensor3", 1747718199, 56.222)
  ))
  //输出DataStream
  stream.print().setParallelism(1)
  //执行
  env.execute()
}

case class SensorReading(id:String,timestamp:Long,temperature:Double)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

从文件读取数据

val value1: DataStream[String] = env.readTextFile("")
  • 1

从socket读取

 val textDstream: DataStream[String] = env.socketTextStream(host, port)
  • 1

从kafka读取

首先添加pom依赖,其中link-connector-kafka这个依赖就是用来搞定kafka的,注意和flink版本要一样!

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

测试代码

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties

object KafkaSource extends App {
  val properties = new Properties()
  properties.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")
  properties.setProperty("group.id", "consumer-group")
  properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  properties.setProperty("auto.offset.reset", "latest")
  private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  //创建kafka消费者,消费cat topic
  private val consumer = new FlinkKafkaConsumer011[String]("cat", new SimpleStringSchema(), properties)

  import org.apache.flink.api.scala._
  private val stream: DataStream[String] = env.addSource(consumer)
  stream.print().setParallelism(1)
  //执行
  env.execute()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Flink+kafka是如何实现exactly-once语义的?

Flink通过checkpoint来保存数据是否处理完成的状态。

JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。
在这里插入图片描述

总结

  • flink可以接受各种常见source,比如文件,集合,kafka,socket等等
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/615159
推荐阅读
相关标签
  

闽ICP备14008679号