当前位置:   article > 正文

【flink】之如何消费kafka数据?_flink 消费kafka消息

flink 消费kafka消息

为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。

 1.环境准备

确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181

2.Maven项目设置

创建一个新的Maven项目,并在pom.xml中添加以下依赖:

  1. <dependencies>
  2. <!-- Flink dependencies -->
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-streaming-java_2.12</artifactId>
  6. <version>1.13.2</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-connector-kafka_2.12</artifactId>
  11. <version>1.13.2</version>
  12. </dependency>
  13. <!-- Kafka client dependency -->
  14. <dependency>
  15. <groupId>org.apache.kafka</groupId>
  16. <artifactId>kafka-clients</artifactId>
  17. <version>2.8.0</version>
  18. </dependency>
  19. <!-- Logging -->
  20. <dependency>
  21. <groupId>org.slf4j</groupId>
  22. <artifactId>slf4j-log4j12</artifactId>
  23. <version>1.7.30</version>
  24. </dependency>
  25. </dependencies>

注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。

3.编写Flink Kafka Consumer代码

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import java.util.Properties;
  6. public class FlinkKafkaConsumerDemo {
  7. public static void main(String[] args) throws Exception {
  8. // 设置执行环境
  9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. // Kafka消费者属性
  11. Properties props = new Properties();
  12. props.put("bootstrap.servers", "localhost:9092");
  13. props.put("group.id", "test-group");
  14. props.put("enable.auto.commit", "true");
  15. props.put("auto.commit.interval.ms", "1000");
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. // 创建Kafka消费者
  19. FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
  20. "input-topic", // Kafka topic
  21. new SimpleStringSchema(), // 反序列化器
  22. props);
  23. // 添加数据源
  24. DataStream<String> stream = env.addSource(myConsumer);
  25. // 数据处理
  26. stream.map(new MapFunction<String, String>() {
  27. @Override
  28. public String map(String value) throws Exception {
  29. return "Received: " + value;
  30. }
  31. }).print();
  32. // 执行流程序
  33. env.execute("Flink Kafka Consumer Example");
  34. }
  35. // 简单的字符串反序列化器
  36. public static final class SimpleStringSchema implements DeserializationSchema<String> {
  37. @Override
  38. public String deserialize(byte[] message) throws IOException {
  39. return new String(message, "UTF-8");
  40. }
  41. @Override
  42. public boolean isEndOfStream(String nextElement) {
  43. return false;
  44. }
  45. @Override
  46. public TypeInformation<String> getProducedType() {
  47. return BasicTypeInfo.STRING_TYPE_INFO;
  48. }
  49. }
  50. }

4.执行程序

  1. 确保Kafka正在运行,并且有一个名为input-topic的topic(如果没有,你需要先创建它)。
  2. 编译并运行你的Maven项目
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/888202
推荐阅读
相关标签
  

闽ICP备14008679号