当前位置:   article > 正文

Flink框架关于Kafka生产消费的实现demo_flink 消费kafka demo

flink 消费kafka demo

flink作为一个大数据框架,已经由阿里充分的证实了其性能和前景。但对国内仍然是一个比较陌生的状态,无论是开源的文档和实例都比较缺乏。之前找到的demo很多都是旧版本;同时flink本身面临版本演进,blink开源等一些影响,也会在之后出现一些比较大的变化。

我根据目前的资料,编写了基于flink的Kafka生产消费demo,便于初步的了解flink的特性,也希望能对他人了解flink提供一些小小的帮助。

以下demo是我根据一些其他的demo整合而成,引用申明见最下方。

 


导包

flink的导包是常见的核心+拓展模式

flink-java是flink的核心实现.我使用的是(看起来比较成熟的1.5.0,可能1.6.0更合适)

flink-streaming-java_2.11是使用flink流接入的包

flink-connector-kafka-0.10_2.11是为了使用Kafka开发的中间件,而使用这个包必须保证版号对应,及使用kafka0.10和flink流2.11

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.5.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.11</artifactId>
  9. <version>1.5.0</version>
  10. </dependency>
  11. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
  15. <version>1.5.0</version>
  16. </dependency>

写入(生产者)

源为一个循环的数据产生器,输出为对应的kafka topic

  1. public class WriteIntoKafka {
  2. public static void main(String[] args) throws Exception {
  3. // create execution environment
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. Map properties= new HashMap();
  6. properties.put("bootstrap.servers", "/*服务地址*/");
  7. properties.put("topic", "/*topic*/");
  8. // parse user parameters
  9. ParameterTool parameterTool = ParameterTool.fromMap(properties);
  10. // add a simple source which is writing some strings
  11. DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
  12. // write stream to Kafka
  13. messageStream.addSink(new FlinkKafkaProducer010<>(parameterTool.getRequired("bootstrap.servers"),
  14. parameterTool.getRequired("topic"),
  15. new SimpleStringSchema()));
  16. messageStream.rebalance().map(new MapFunction<String, String>() {
  17. //序列化设置
  18. private static final long serialVersionUID = 1L;
  19. @Override
  20. public String map(String value) throws Exception {
  21. return value;
  22. }
  23. });
  24. messageStream.print();
  25. env.execute();
  26. }
  27. public static class SimpleStringGenerator implements SourceFunction<String> {
  28. //序列化设置
  29. private static final long serialVersionUID = 1L;
  30. boolean running = true;
  31. @Override
  32. public void run(SourceContext<String> ctx) throws Exception {
  33. while(running) {
  34. ctx.collect(prouderJson());
  35. }
  36. }
  37. @Override
  38. public void cancel() {
  39. running = false;
  40. }
  41. }
  42. }

读取(消费者)

采用Kafka消费者作为源,通过MapFunction转换后输出

  1. public class ReadFromKafka {
  2. public static void main(String[] args) throws Exception {
  3. // create execution environment
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. Map properties= new HashMap();
  6. properties.put("bootstrap.servers", "/*服务地址*/");
  7. properties.put("group.id", "test");
  8. properties.put("enable.auto.commit", "true");
  9. properties.put("auto.commit.interval.ms", "1000");
  10. properties.put("auto.offset.reset", "earliest");
  11. properties.put("session.timeout.ms", "30000");
  12. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14. properties.put("topic", "/*topic*/");
  15. // parse user parameters
  16. ParameterTool parameterTool = ParameterTool.fromMap(properties);
  17. FlinkKafkaConsumer010 consumer010 = new FlinkKafkaConsumer010(
  18. parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
  19. DataStream<String> messageStream = env
  20. .addSource(consumer010);
  21. // print() will write the contents of the stream to the TaskManager's standard out stream
  22. // the rebelance call is causing a repartitioning of the data so that all machines
  23. // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
  24. messageStream.rebalance().map(new MapFunction<String, String>() {
  25. private static final long serialVersionUID = 1L;
  26. @Override
  27. public String map(String value) throws Exception {
  28. return value;
  29. }
  30. });
  31. messageStream.print();
  32. env.execute();
  33. }
  34. }

细节

1,配置我采用了 Map properties= new HashMap(); 这种在程序内部写入的方式。

也可以采用读取文件的配置的方式。

3,另一种配置方式,可以对FlinkKafkaConsumer010进行配置,具体使用待补充

采用Kafka为data Source使用

2,新旧版本的最大区别就是 使用FlinkKafkaProducer010和FlinkKafkaProducer010作为生产者和消费者的实例,具体方法的使用和之前的版本有些差别

4,生产者源的产生方式是值得研究的,也应该有其他的产生方式。很多实现细节没有弄清楚(待补充)

5.消费者转换的MapFunction感觉仍有不清楚的地方,这一步可以实现什么逻辑,适合写什么样的逻辑呢(待补充)

6,输出细节,可以在 实现中输出,也可以使用messageStream.print(),进行输出,具体格式可以自行验证。


问题

以上两个demo是调用flink—Kafka的api实现的,但是仍有一些的问题,没有弄清:

1,关于生产者,如何自定义发送数据格式,如byte[],如何自定义发送的分区

2,关于消费者kafka低级api的使用,seek2end,拉取指定分区,手动提交偏移量等设置的实现


引用

https://www.jianshu.com/p/f9d447a3c48f

https://blog.csdn.net/lmalds/article/details/51780950

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

github中的某段代码(找了很久,忘了出处了)


补充

10.28

最近把一些用过的demo放在了GitHub上,可以看下:https://github.com/jyj019/flinkDemo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/836412
推荐阅读
相关标签
  

闽ICP备14008679号