赞
踩
记一次flink踩坑教训
上代码
public class FlinkKafkaConsumer1 { public static void main(String[] args) throws Exception{ //1.获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.创建消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"需要连接的hadoopIP:9092"); // properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties); //3.消费者与对应的flink流关联 DataStreamSource<String> dataStream = env.addSource(kafkaConsumer); SingleOutputStreamOperator<Tuple2<String, Integer>> dataSource = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。