当前位置:   article > 正文

flink从kafka读数据demo(java)_flink java kafka demo

flink java kafka demo

1.结构

  1. flink-demo
  2. --src
  3. --main
  4. --java
  5. --com.bob.demo
  6. --App
  7. --DemoRichSink
  8. --FlinkDemo
  9. --resources
  10. --consumer.properties
  11. --start.sh
  12. --pom.xml

2.具体代码

2.1 pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.bob</groupId>
  7. <artifactId>flink-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <flink.version>1.11.2</flink.version>
  13. </properties>
  14. <dependencies>
  15. <!--flink核心依赖-->
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-streaming-java_2.11</artifactId>
  19. <version>${flink.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-connector-kafka_2.11</artifactId>
  24. <version>${flink.version}</version>
  25. </dependency>
  26. </dependencies>
  27. </project>

2.2 配置文件comsumer.properties

  1. bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
  2. kafka.topic=flinkdemo
  3. group.id=demo001
  4. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  5. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  6. sasl.jaas.config=org.apache.kafka.common.security.scram.ScramCredential required username=demo passsword=demo;
  7. security.protocol=SASL_PLAINTEXT
  8. sasl.mechanism=SCRAM-SHA-256

2.3 启动脚本start.sh  --提交到yarn运行

  1. #!/bin/bash
  2. set -x
  3. # kinit
  4. kinit -kt /home/tomcat/keytab-util/demo.keytab demo
  5. FLINK_JAR=flink-demo.jar
  6. FLINK_HPATH=/u01/app/flink
  7. ${FLINK_HPATH}/bin/flink run \
  8. -m yarn-cluster -yqu root.job \
  9. -ynm flink_demo -ys 5 \
  10. -ytm 1024m -Dtaskmanager.memory.flink.size=8192 \
  11. -yjm 1024m -d -c com.bob.demo.App \
  12. /u01/app/flink_demo/${FLINK_JAR} \
  13. -- flink_config_path ./consumer.properties

2.4 启动类App

  1. package com.bob.demo;
  2. import org.apache.flink.api.java.utils.ParameterTool;
  3. public class App {
  4. public static void main(String[] args) {
  5. try {
  6. // 从启动参数获取配置文件
  7. ParameterTool parameters = ParameterTool.fromArgs(args);
  8. String configPath = parameters.get("flink_config_path");
  9. // 获取配置参数,如kafka redis
  10. ParameterTool parameterTool = ParameterTool.fromPropertiesFile(configPath);
  11. FlinkDemo flinkDemo = new FlinkDemo();
  12. flinkDemo.runApp(parameterTool);
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }

2.5 具体实现FlinkDemo

  1. package com.bob.demo;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.api.java.utils.ParameterTool;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.util.Properties;
  10. public class FlinkDemo {
  11. private static Logger logger = LoggerFactory.getLogger(FlinkDemo.class);
  12. public void runApp(ParameterTool parameterTool) throws Exception {
  13. // kafka参数
  14. String topicName = parameterTool.get("topic.name");
  15. Properties properties = initKafkaProperties(parameterTool);
  16. // 创建flink运行环境
  17. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. // 构建数据源source
  19. DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), properties));
  20. // 设置全局变量
  21. env.getConfig().setGlobalJobParameters(parameterTool);
  22. try {
  23. // 构建sink1
  24. stream.addSink(new DemoRichSink());
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. // 构建sink2
  29. }
  30. /**
  31. * 初始化kafka参数
  32. */
  33. public Properties initKafkaProperties(ParameterTool parameterTool) {
  34. Properties properties = new Properties();
  35. properties.setProperty("bootstrap.servers",parameterTool.get("bootstrap.servers").trim());
  36. properties.setProperty("kafka.topic",parameterTool.get("kafka.topic").trim());
  37. properties.setProperty("group.id",parameterTool.get("group.id").trim());
  38. properties.setProperty("key.deserializer",parameterTool.get("key.deserializer").trim());
  39. properties.setProperty("value.deserializer",parameterTool.get("value.deserializer").trim());
  40. properties.setProperty("sasl.jaas.config",parameterTool.get("sasl.jaas.config").trim());
  41. properties.setProperty("security.protocol",parameterTool.get("security.protocol").trim());
  42. properties.setProperty("sasl.mechanism",parameterTool.get("sasl.mechanism").trim());
  43. return properties;
  44. }
  45. }

2.6 sink逻辑DemoRichSink

  1. package com.bob.demo;
  2. import org.apache.flink.api.java.utils.ParameterTool;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import java.io.Serializable;
  5. public class DemoRichSink extends RichSinkFunction<String> implements Serializable {
  6. @Override
  7. public void invoke(String value, Context context) throws Exception {
  8. // 获取全局变量
  9. ParameterTool globalParams = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  10. // TODO 具体逻辑,写hdfs,hbase,es,socket.....
  11. }
  12. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/530203
推荐阅读
相关标签
  

闽ICP备14008679号