当前位置:   article > 正文

Flink流式处理集成kafka_flink集成kafka

flink集成kafka

1:引言

        对于实时处理当中,我们实际工作当中的数据源一般都是使用kafka,所以我们一起来看看如何通过Flink来集成kafka

        Flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义,而且对于kafka的partition,Flink会启动对应的并行度去处理kafka当中的每个分区的数据。

        Flink整合kafka官网介绍

        https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

2:导入pom依赖

  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
  2. <dependency>
  3.    <groupId>org.apache.flink</groupId>
  4.    <artifactId>flink-connector-kafka_2.11</artifactId>
  5.    <version>1.9.2</version>
  6. </dependency>
  7. <dependency>
  8.    <groupId>org.apache.flink</groupId>
  9.    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  10.    <version>1.9.2</version>
  11. </dependency>
  12. <dependency>
  13.    <groupId>org.apache.kafka</groupId>
  14.    <artifactId>kafka-clients</artifactId>
  15.    <version>1.1.0</version>
  16. </dependency>
  17. <dependency>
  18.    <groupId>org.slf4j</groupId>
  19.    <artifactId>slf4j-api</artifactId>
  20.    <version>1.7.25</version>
  21. </dependency>
  22. <dependency>
  23.    <groupId>org.slf4j</groupId>
  24.    <artifactId>slf4j-log4j12</artifactId>
  25.    <version>1.7.25</version>
  26. </dependency>

3:将kafka作为flink的source来使用

        实际工作当中一般都是将kafka作为flink的source来使用

3.1:创建kafka的topic

        安装好kafka集群,并启动kafka集群,然后在node01执行以下命令创建kafka的topic为test

kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181

3.2:代码实现

  1. import java.util.Properties​
  2. import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
  3. import org.apache.flink.streaming.api.CheckpointingMode
  4. import org.apache.flink.streaming.api.environment.CheckpointConfig
  5. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  7. import org.apache.flink.streaming.util.serialization.SimpleStringSchema
  8. /**
  9.  * 将kafka作为flink的source来使用
  10.  */
  11. object FlinkKafkaSource {
  12.  def main(args: Array[String]): Unit = {
  13.    val env = StreamExecutionEnvironment.getExecutionEnvironment
  14.    //**隐式转换
  15.    import org.apache.flink.api.scala._
  16.    //checkpoint**配置
  17.    env.enableCheckpointing(100)
  18.    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  19.    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  20.    env.getCheckpointConfig.setCheckpointTimeout(60000)
  21.    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  22.    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  23.    //设置statebackend
  24.    env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));
  25.    val topic = "test"
  26.    val prop = new Properties()
  27.    prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
  28.    prop.setProperty("group.id","con1")
  29.    prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  30.    prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  31.    val kafkaConsumer = new FlinkKafkaConsumer[String]("test",new SimpleStringSchema,prop)
  32.    kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
  33.    val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)
  34.    kafkaSource.print()
  35.    env.execute()​
  36. }
  37. }

3.3:kafka生产数据

        node01执行以下命令,通过shell命令行来生产数据到kafka当中去

  1. ##创建topic
  2. kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
  3. ##发送数据
  4. kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

4:将kafka作为flink的sink来使用

        我们也可以将kafka作为flink的sink来使用,就是将flink处理完成之后的数据写入到kafka当中去。

4.1:socket发送数据

        node01执行以下命令,从socket当中发送数据

nc -lk 9999

4.2:代码实现

  1. import java.util.Properties​
  2. import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
  3. import org.apache.flink.streaming.api.CheckpointingMode
  4. import org.apache.flink.streaming.api.environment.CheckpointConfig
  5. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
  7. import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
  8. import org.apache.flink.streaming.util.serialization.SimpleStringSchema
  9. /**
  10.  * 将kafka作为flink的sink来使用
  11.  */
  12. object FlinkKafkaSink {
  13.  def main(args: Array[String]): Unit = {
  14.    val env = StreamExecutionEnvironment.getExecutionEnvironment
  15.    //隐式转换
  16.    import org.apache.flink.api.scala._
  17.      
  18.    //checkpoint配置
  19.    env.enableCheckpointing(5000);
  20.    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  21.    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
  22.    env.getCheckpointConfig.setCheckpointTimeout(60000);
  23.    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
  24.    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  25.    //设置statebackend
  26.    env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));
  27.    val socketStream = env.socketTextStream("node01",9999)
  28.    val topic = "test"
  29.    val prop = new Properties()
  30.    prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
  31.    prop.setProperty("group.id","kafka_group1")
  32.    //第一种解决方案,设置FlinkKafkaProducer里面的事务超时时间
  33.    //设置事务超时时间
  34.    prop.setProperty("transaction.timeout.ms",60000*15+"");
  35.    //第二种解决方案,设置kafka的最大事务超时时间
  36.      
  37.    //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
  38.    
  39.      //使用支持仅一次语义的形式
  40.    /**
  41.      * defaultTopic: String,
  42.      * serializationSchema: KafkaSerializationSchema[IN],
  43.      * producerConfig: Properties,
  44.      * semantic: FlinkKafkaProducer.Semantic
  45.      */
  46.    val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
  47.    socketStream.addSink(kafkaSink)​
  48.    env.execute("StreamingFromCollectionScala")​
  49. }
  50. }

4.3:启动kafka消费者

        node01执行以下命令启动kafka消费者,消费数据

kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/530205
推荐阅读
相关标签
  

闽ICP备14008679号