当前位置:   article > 正文

flink从kafka读取数据并传到mysql数据库_flink 读取数据库数据 再保存到数据库

flink 读取数据库数据 再保存到数据库

记一次flink踩坑教训

上代码

public class FlinkKafkaConsumer1 {
    public static void main(String[] args) throws Exception{
        //1.获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.创建消费者
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"需要连接的hadoopIP:9092");
//        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties);
        //3.消费者与对应的flink流关联
        DataStreamSource<String> dataStream = env.addSource(kafkaConsumer);
        SingleOutputStreamOperator<Tuple2<String, Integer>> dataSource = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/892036
推荐阅读
相关标签
  

闽ICP备14008679号