赞
踩
对于实时处理当中,我们实际工作当中的数据源一般都是使用kafka,所以我们一起来看看如何通过Flink来集成kafka
Flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义,而且对于kafka的partition,Flink会启动对应的并行度去处理kafka当中的每个分区的数据。
Flink整合kafka官网介绍
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>1.9.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
- <version>1.9.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.25</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.25</version>
- </dependency>
实际工作当中一般都是将kafka作为flink的source来使用
安装好kafka集群,并启动kafka集群,然后在node01执行以下命令创建kafka的topic为test
kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
- import java.util.Properties
- import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
- import org.apache.flink.streaming.api.CheckpointingMode
- import org.apache.flink.streaming.api.environment.CheckpointConfig
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
- import org.apache.flink.streaming.util.serialization.SimpleStringSchema
-
- /**
- * 将kafka作为flink的source来使用
- */
- object FlinkKafkaSource {
-
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- //**隐式转换
- import org.apache.flink.api.scala._
- //checkpoint**配置
- env.enableCheckpointing(100)
- env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
- env.getCheckpointConfig.setCheckpointTimeout(60000)
- env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
- env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- //设置statebackend
- env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));
-
- val topic = "test"
- val prop = new Properties()
- prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
- prop.setProperty("group.id","con1")
- prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- val kafkaConsumer = new FlinkKafkaConsumer[String]("test",new SimpleStringSchema,prop)
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
- val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)
- kafkaSource.print()
- env.execute()
- }
- }
node01执行以下命令,通过shell命令行来生产数据到kafka当中去
- ##创建topic
- kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181
-
- ##发送数据
- kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
我们也可以将kafka作为flink的sink来使用,就是将flink处理完成之后的数据写入到kafka当中去。
node01执行以下命令,从socket当中发送数据
nc -lk 9999
- import java.util.Properties
- import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
- import org.apache.flink.streaming.api.CheckpointingMode
- import org.apache.flink.streaming.api.environment.CheckpointConfig
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
- import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
- import org.apache.flink.streaming.util.serialization.SimpleStringSchema
-
- /**
- * 将kafka作为flink的sink来使用
- */
- object FlinkKafkaSink {
-
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- //隐式转换
- import org.apache.flink.api.scala._
-
- //checkpoint配置
- env.enableCheckpointing(5000);
- env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
- env.getCheckpointConfig.setCheckpointTimeout(60000);
- env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
- env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- //设置statebackend
- env.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));
- val socketStream = env.socketTextStream("node01",9999)
- val topic = "test"
- val prop = new Properties()
- prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
- prop.setProperty("group.id","kafka_group1")
- //第一种解决方案,设置FlinkKafkaProducer里面的事务超时时间
- //设置事务超时时间
- prop.setProperty("transaction.timeout.ms",60000*15+"");
- //第二种解决方案,设置kafka的最大事务超时时间
-
- //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
-
- //使用支持仅一次语义的形式
- /**
- * defaultTopic: String,
- * serializationSchema: KafkaSerializationSchema[IN],
- * producerConfig: Properties,
- * semantic: FlinkKafkaProducer.Semantic
- */
- val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
- socketStream.addSink(kafkaSink)
- env.execute("StreamingFromCollectionScala")
- }
- }
node01执行以下命令启动kafka消费者,消费数据
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。