当前位置:   article > 正文

kafka集成flink api编写教程

kafka集成flink api编写教程

1.引入依赖(pox.xml)

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.13.6</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.12</artifactId>
  10. <version>1.13.6</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.12</artifactId>
  15. <version>1.13.6</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-kafka_2.12</artifactId>
  20. <version>1.13.6</version>
  21. </dependency>
  22. </dependencies>

2.创建日志配置文件

把$FLINK_HOME/conf/log4j.properties 内容复制粘贴过来

  1. # This affects logging for both user code and Flink
  2. rootLogger.level = INFO
  3. rootLogger.appenderRef.file.ref = MainAppender
  4. # Uncomment this if you want to _only_ change Flink's logging
  5. #logger.flink.name = org.apache.flink
  6. #logger.flink.level = INFO
  7. # The following lines keep the log level of common libraries/connectors on
  8. # log level INFO. The root logger does not override this. You have to manually
  9. # change the log levels here.
  10. logger.akka.name = akka
  11. logger.akka.level = INFO
  12. logger.kafka.name= org.apache.kafka
  13. logger.kafka.level = INFO
  14. logger.hadoop.name = org.apache.hadoop
  15. logger.hadoop.level = INFO
  16. logger.zookeeper.name = org.apache.zookeeper
  17. logger.zookeeper.level = INFO
  18. logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
  19. logger.shaded_zookeeper.level = INFO
  20. # Log all infos in the given file
  21. appender.main.name = MainAppender
  22. appender.main.type = RollingFile
  23. appender.main.append = true
  24. appender.main.fileName = ${sys:log.file}
  25. appender.main.filePattern = ${sys:log.file}.%i
  26. appender.main.layout.type = PatternLayout
  27. appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  28. appender.main.policies.type = Policies
  29. appender.main.policies.size.type = SizeBasedTriggeringPolicy
  30. appender.main.policies.size.size = 100MB
  31. appender.main.policies.startup.type = OnStartupTriggeringPolicy
  32. appender.main.strategy.type = DefaultRolloverStrategy
  33. appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
  34. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  35. logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
  36. logger.netty.level = OFF

3.flink生产者api

  1. package com.ljr.flink;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import java.util.ArrayList;
  8. import java.util.Properties;
  9. public class MyFlinkKafkaProducer {
  10. //输入main tab 键 即创建入main 方法
  11. public static void main(String[] args) throws Exception {
  12. //1.获取环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. //设置的槽数与分区相等
  15. env.setParallelism(3);
  16. //2.准备数据源
  17. ArrayList<String> wordlist = new ArrayList<>();
  18. wordlist.add("zhangsan");
  19. wordlist.add("lisi");
  20. DataStreamSource<String> stream = env.fromCollection(wordlist);
  21. //创建kafka生产者
  22. Properties pros = new Properties();
  23. pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
  24. FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("customers", new SimpleStringSchema(), pros);
  25. //3.添加数据源
  26. stream.addSink(kafkaProducer);
  27. //4.执行代码
  28. env.execute();
  29. }
  30. }

运行;kafka消费者消费结果

4.flink消费者api

  1. package com.ljr.flink;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;
  6. import java.util.Properties;
  7. public class MyFlinkKafkaConsumer {
  8. public static void main(String[] args) throws Exception {
  9. //1 初始化flink环境
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setParallelism(3);
  12. //2 创建消费者
  13. Properties pros = new Properties();
  14. pros.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
  15. //pros.put(ConsumerConfig.GROUP_ID_CONFIG,"hh")
  16. FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("customers", new SimpleStringSchema(), pros);
  17. //3 关联消费者和flink流
  18. env.addSource(flinkKafkaConsumer).print();
  19. //4 执行
  20. env.execute();
  21. }
  22. }

运行,用3中的生产者生产数据,消费结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/710619
推荐阅读
相关标签
  

闽ICP备14008679号