赞
踩
导入maven依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.13.0</version> </dependency>
生产者推送消息
public static void main(String[] args) throws Exception { //kafka配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); //构建发送数据 DataStreamSource<String> messages= env.env.fromElements("1","2","3"); //将数据发送到frist主题 FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("frist",new SimpleStringSchema(),props); messages.addSink(producer); env.execute(); }
从kafka消费消息
public static void main(String[] args) throws Exception { //kafka配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "10000"); props.put("max.poll.interval.ms", "600000"); props.put("max.poll.records", "100"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); //从frist消费数据,可以消费多个主题List<String> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("frist",new SimpleStringSchema(),props); env.addSource(consumer).print(); env.execute(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。