当前位置:   article > 正文

Flink集成kafka消费数据和推送消息_flink 推送

flink 推送

导入maven依赖

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>1.13.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.12</artifactId>
	<version>1.13.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-clients_2.12</artifactId>
	<version>1.13.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.12</artifactId>
	<version>1.13.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

生产者推送消息

 public static void main(String[] args) throws Exception {
        //kafka配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        //构建发送数据
         DataStreamSource<String> messages= env.env.fromElements("1","2","3");
         //将数据发送到frist主题
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("frist",new SimpleStringSchema(),props);
        messages.addSink(producer);
        env.execute();
}      
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

从kafka消费消息

public static void main(String[] args) throws Exception {
        //kafka配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "10000");
        props.put("max.poll.interval.ms", "600000");
        props.put("max.poll.records", "100");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		//获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        //从frist消费数据,可以消费多个主题List<String>
       FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("frist",new SimpleStringSchema(),props);
        env.addSource(consumer).print();
        env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/888212
推荐阅读
相关标签
  

闽ICP备14008679号