赞
踩
- flink-demo
- --src
- --main
- --java
- --com.bob.demo
- --App
- --DemoRichSink
- --FlinkDemo
- --resources
- --consumer.properties
- --start.sh
- --pom.xml
2.1 pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.bob</groupId>
- <artifactId>flink-demo</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <flink.version>1.11.2</flink.version>
- </properties>
-
- <dependencies>
- <!--flink核心依赖-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- </dependencies>
-
- </project>
- bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
- kafka.topic=flinkdemo
- group.id=demo001
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- sasl.jaas.config=org.apache.kafka.common.security.scram.ScramCredential required username=demo passsword=demo;
- security.protocol=SASL_PLAINTEXT
- sasl.mechanism=SCRAM-SHA-256
- #!/bin/bash
-
- set -x
-
- # kinit
- kinit -kt /home/tomcat/keytab-util/demo.keytab demo
-
- FLINK_JAR=flink-demo.jar
- FLINK_HPATH=/u01/app/flink
-
- ${FLINK_HPATH}/bin/flink run \
- -m yarn-cluster -yqu root.job \
- -ynm flink_demo -ys 5 \
- -ytm 1024m -Dtaskmanager.memory.flink.size=8192 \
- -yjm 1024m -d -c com.bob.demo.App \
- /u01/app/flink_demo/${FLINK_JAR} \
- -- flink_config_path ./consumer.properties
- package com.bob.demo;
-
- import org.apache.flink.api.java.utils.ParameterTool;
-
- public class App {
- public static void main(String[] args) {
- try {
- // 从启动参数获取配置文件
- ParameterTool parameters = ParameterTool.fromArgs(args);
- String configPath = parameters.get("flink_config_path");
- // 获取配置参数,如kafka redis
- ParameterTool parameterTool = ParameterTool.fromPropertiesFile(configPath);
- FlinkDemo flinkDemo = new FlinkDemo();
- flinkDemo.runApp(parameterTool);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- package com.bob.demo;
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.Properties;
-
- public class FlinkDemo {
- private static Logger logger = LoggerFactory.getLogger(FlinkDemo.class);
-
- public void runApp(ParameterTool parameterTool) throws Exception {
- // kafka参数
- String topicName = parameterTool.get("topic.name");
- Properties properties = initKafkaProperties(parameterTool);
- // 创建flink运行环境
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 构建数据源source
- DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), properties));
- // 设置全局变量
- env.getConfig().setGlobalJobParameters(parameterTool);
- try {
- // 构建sink1
- stream.addSink(new DemoRichSink());
- } catch (Exception e) {
- e.printStackTrace();
- }
- // 构建sink2
- }
-
- /**
- * 初始化kafka参数
- */
- public Properties initKafkaProperties(ParameterTool parameterTool) {
- Properties properties = new Properties();
-
- properties.setProperty("bootstrap.servers",parameterTool.get("bootstrap.servers").trim());
- properties.setProperty("kafka.topic",parameterTool.get("kafka.topic").trim());
- properties.setProperty("group.id",parameterTool.get("group.id").trim());
- properties.setProperty("key.deserializer",parameterTool.get("key.deserializer").trim());
- properties.setProperty("value.deserializer",parameterTool.get("value.deserializer").trim());
- properties.setProperty("sasl.jaas.config",parameterTool.get("sasl.jaas.config").trim());
- properties.setProperty("security.protocol",parameterTool.get("security.protocol").trim());
- properties.setProperty("sasl.mechanism",parameterTool.get("sasl.mechanism").trim());
- return properties;
- }
- }
- package com.bob.demo;
-
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
- import java.io.Serializable;
-
- public class DemoRichSink extends RichSinkFunction<String> implements Serializable {
-
- @Override
- public void invoke(String value, Context context) throws Exception {
- // 获取全局变量
- ParameterTool globalParams = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
- // TODO 具体逻辑,写hdfs,hbase,es,socket.....
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。