赞
踩
object KafkaSource { def main(args: Array[String]): Unit = { //创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //配置kafka val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop102:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //添加kafkasource val stream:DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties)) //打印数据流 stream.print("stream:").setParallelism(1) //开启job env.execute("stream") } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。