赞
踩
先总结三种方式,这三种就是常见的.
- /**
- * @author daqu
- * @date 2022/6/13 15:22
- * @descriable:
- */
- @Slf4j
- public class KafkaConsumerUtils {
-
- //简单模式
- public static FlinkKafkaConsumer createKafkaConsumer(StreamExecutionEnvironment env, String topic, String groupID) throws Exception {
-
- Properties prop =new Properties();
- prop.setProperty("bootstrap.servers", "192.168.1.178:9092,192.168.1.179:9092,192.168.1.180:9092");
- prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("auto.offset.reset", "latest");
-
-
- SimpleStringSchema schema = new SimpleStringSchema();
- FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, schema, prop);
- //设置自动提交offset
- consumer.setCommitOffsetsOnCheckpoints(true);
- return consumer;
- }
-
- //设置消费位置
- public static FlinkKafkaConsumer flinkKafkaConsumerSerDe (StreamExecutionEnvironment env , String topic , String groupID , KafkaDeserializationSchema schema, StartupMode sm) throws IOException {
-
- Properties prop =new Properties();
- prop.setProperty("bootstrap.servers", "192.168.1.178:9092,192.168.1.179:9092,192.168.1.180:9092");
- prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("auto.offset.reset", "latest");
-
-
-
- FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, schema, prop);
- //设置kafka 的消费位置
- if(sm.equals(StartupMode.EARLIEST)){
- consumer.setStartFromEarliest();
- }else if (sm.equals(StartupMode.LATEST)){
- consumer.setStartFromLatest();
- }
- //设置自动提交offset
- consumer.setCommitOffsetsOnCheckpoints(true);
- return consumer;
- }
-
- //多个topic
- public static FlinkKafkaConsumer mutiTopickafkaConsumer(StreamExecutionEnvironment env,String groupID) throws IOException {
-
- Properties prop =new Properties();
- prop.setProperty("bootstrap.servers", "192.168.1.178:9092,192.168.1.179:9092,192.168.1.180:9092");
- prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.setProperty("auto.offset.reset", "latest");
- LinkedList<String> topics = new LinkedList<>();
- topics.add("test01");
- topics.add("test02");
-
- FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), prop);
- return consumer;
- }
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- String topic ="test01";
- String groupId="kafka-group";
- FlinkKafkaConsumer consumer = KafkaConsumerUtils.createKafkaConsumer(env, topic, groupId);
- env.addSource(consumer).print();
-
- KafkaDeserializationSchemaWrapper schemaWrapper = new KafkaDeserializationSchemaWrapper(new SimpleStringSchema());
- FlinkKafkaConsumer consumer1 = KafkaConsumerUtils.flinkKafkaConsumerSerDe(env, topic, groupId, schemaWrapper, StartupMode.LATEST);
- env.addSource(consumer1).print();
-
-
- FlinkKafkaConsumer consumer2 = KafkaConsumerUtils.mutiTopickafkaConsumer(env, groupId);
- env.addSource(consumer2).print();
-
-
- env.execute();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。