赞
踩
kafka依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
核心代码:
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
-
-
- import java.util.Properties
-
-
- /**
- * @author shkstart
- * @create 2021-07-23 8:06
- */
- object Test {
- def main(args: Array[String]): Unit = {
- //1.创建运行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-
- //2.读取kafka中的数据
- //2.1创建配置对象,这里的对象为java的对象
- val properties = new Properties()
- properties.setProperty("bootstrap.servers","hadoop101:9092") //hadoop101为自己的kafka地址
- properties.setProperty("group.id","consumer-group")
- val kafavalue = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
- //打印
- kafavalue.print()
-
- //执行
- env.execute()
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。