赞
踩
1.添加相关依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.12</artifactId>
- <version>1.9.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.12</artifactId>
- <version>1.9.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.12</artifactId>
- <version>1.9.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>1.9.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.13</artifactId>
- <version>2.4.0</version>
- </dependency>
2.创建scala类,并开发代码
- package com.vincer
-
- import java.util.Properties
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema
-
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
- // flatMap和Map需要引用的隐式转换
- import org.apache.flink.api.scala._
-
- /**
- * @Package com.vincer
- * @ClassName conkafka
- * @Author Vincer
- * @Date 2020/01/13 22:31
- * @ProjectName kafka-flink
- */
- object conkafka {
-
- def main(args: Array[String]): Unit = {
- val kafkaProps = new Properties()
- //kafka的一些属性
- kafkaProps.setProperty("bootstrap.servers", "hadoop100:9092")
- //所在的消费组
- kafkaProps.setProperty("group.id", "group_test")
- //获取当前的执行环境
- val evn = StreamExecutionEnvironment.getExecutionEnvironment
- //kafka的consumer,test1是要消费的topic
- val kafkaSource = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema, kafkaProps)
- //设置从最新的offset开始消费
- //kafkaSource.setStartFromLatest()
- //自动提交offset
- kafkaSource.setCommitOffsetsOnCheckpoints(true)
-
- //flink的checkpoint的时间间隔
- evn.enableCheckpointing(5000)
- //添加consumer
- val stream = evn.addSource(kafkaSource)
- stream.setParallelism(3)
- val text = stream
-
-
- text.print()
- //启动执行
- evn.execute("kafkawd")
- }
- }
3.启动zookeeper,kafka
(过程免)
4.启动kafka的Client生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
5.运行代码
6.在kafka上生产数据,打印到IDEA的控制台
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。