赞
踩
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object Sensor extends App { //创建环境 private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.api.scala._ //从集合构建DataStream private val stream: DataStream[SensorReading] = env.fromCollection(List( SensorReading("sensor1", 1547718199, 38.222), SensorReading("sensor2", 1547718199, 40.222), SensorReading("sensor3", 1747718199, 56.222) )) //输出DataStream stream.print().setParallelism(1) //执行 env.execute() } case class SensorReading(id:String,timestamp:Long,temperature:Double)
val value1: DataStream[String] = env.readTextFile("")
val textDstream: DataStream[String] = env.socketTextStream(host, port)
首先添加pom
依赖,其中link-connector-kafka
这个依赖就是用来搞定kafka
的,注意和flink
版本要一样!
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.2</version> </dependency> </dependencies>
测试代码
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import java.util.Properties object KafkaSource extends App { val properties = new Properties() properties.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //创建kafka消费者,消费cat topic private val consumer = new FlinkKafkaConsumer011[String]("cat", new SimpleStringSchema(), properties) import org.apache.flink.api.scala._ private val stream: DataStream[String] = env.addSource(consumer) stream.print().setParallelism(1) //执行 env.execute() }
Flink+kafka是如何实现exactly-once语义的?
Flink
通过checkpoint
来保存数据是否处理完成的状态。
由JobManager
协调各个TaskManager
进行checkpoint
存储,checkpoint
保存在 StateBackend
中,默认StateBackend
是内存级的,也可以改为文件级的进行持久化保存。
执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
如果宕机需要通过StateBackend
进行恢复,只能恢复所有确认提交的操作。
flink
可以接受各种常见source
,比如文件,集合,kafka,socket
等等Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。