赞
踩
相对Storm&Spark来说,Flink在业界使用可能没前者那么广泛,从搜索引擎检索量&网络文章的质量可见一斑,相关概念介绍的比较混乱,官网文档相对也不是太清晰。
Flink的集群部署模式:
Standalone(最简单,缺点资源隔离做的不太好,存在deploy job不均衡),YARN(比较成熟,常用于生成环境),Mesos,Docker。
本文主要介绍Flink on yarn,此模式job manager存在单点故障问题,一般采用HA解决;如果不用HA,此模式基本不需要安装配置,官网下载的文件解压后放在各节点即可。
Flink on yarn作业提交分两种(重点,主要是相关概念不是特别清晰),生产环境推荐2。
1.yarn session(Start a long-running Flink cluster on YARN),有两种启动集群方式(区别就是是否有参数-d,加了d提交flink到yarn集群后就退出)#ENV VAR MUST SETUP:YARN_CONF_DIR or HADOOP_CONF_DIR
----1.1客户端模式;
./bin/yarn-session.sh -jm 500 -tm 500;#jm&tm默认都是1G,需要设定HADOOP_CLASSPATH
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://ip:9000/LICENSE -output hdfs://ip:9000/wordcount-result_1.txt
----1.2分离式模式 #主要就是指detach,参数-d
./bin/yarn-session.sh -nm yarn-session_test -d
2.Flink run(Run a Flink job on YARN) #也可以加参数-d
./bin/flink run -m yarn-cluster -yjm 500m -ytm 500m /examples/batch/WordCount.jar
停止方法:
echo “stop” | ./bin/yarn-session.sh -id .
or yarn application -kill #cannot clearup all jobs remain files
如果安装带hadoop jar的Flink(新版本都没有),不需要任何配置,官网下载的文件解压后放在各节点即可;否则,特别是使用最新版本,需要配置HADOOP_CLASSPATH或者把hadoop下需要的jar拷入flink lib下(主要在jar包冲突的情况下)
export HADOOP_CLASSPATH=/usr/local/hadoop-3.1.3/share/hadoop/common/lib/:/usr/local/hadoop-3.1.3/share/hadoop/yarn/:/usr/local/hadoop-3.1.3/share/hadoop/hdfs/*
K2FlinkDemo.java
package org.ips.demo; import java.util.Properties; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.Collector; import org.ips.demo.MySQLSink; import org.apache.flink.streaming.api.windowing.time.Time; public class K2FlinkDemo { private static final String READ_TOPIC = "IPS_CASH"; private static final String WRITER_TOPIC = "IPS_CASH_RESULT"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.100.101:9092"); //props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "flink-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); DataStreamSource<String> dssk = env.addSource(new FlinkKafkaConsumer<>(READ_TOPIC,new SimpleStringSchema(),props)).setParallelism(1); DataStream<Tuple2<String, Integer>> counts = dssk .flatMap(new LineSplitter()).keyBy(0).timeWindow(Time.seconds(60),Time.seconds(60)).sum(1); counts.print(); counts.addSink(new MySQLSink()); env.execute("flink custom kafka"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { //private static final long serialVersionUID = 1L; public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); if (tokens.length > 1) { try{ out.collect(new Tuple2<String, Integer>(tokens[0], Integer.parseInt(tokens[1]))); } catch (NumberFormatException e) { out.collect(new Tuple2<String, Integer>(tokens[0], 0)); } } } } }
MySQLSink.java
package org.ips.demo; import java.sql.DriverManager; import java.sql.Connection; import java.sql.PreparedStatement; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>> { //private static final long serialVersionUID = 1L; private Connection connection; private PreparedStatement preparedStatement; String username = "root"; String password = "Msql#1234"; String drivername = "com.mysql.cj.jdbc.Driver"; String dburl = "jdbc:mysql://192.168.100.102:3310/test"; @Override public void invoke(Tuple2<String, Integer> value) throws Exception { Class.forName(drivername); connection = DriverManager.getConnection(dburl, username, password); String sql = "insert into test1(ky,vl) values(?,?)"; preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, value.f0); preparedStatement.setInt(2, value.f1); preparedStatement.executeUpdate(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.ips</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.2</version> <scope>runtime</scope> </dependency> </dependencies> <build> <finalName>K2FlinkDemo</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>org.ips.demo.K2FlinkDemo</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
[root@ipsnode1 flink-1.10.1]# ./bin/flink run -m yarn-cluster /home/test/K2Flink2K.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/flink-1.10.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2020-04-17 22:39:08,947 INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at /0.0.0.0:10200
2020-04-17 22:39:08,976 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-04-17 22:39:09,382 INFO org.apache.hadoop.conf.Configuration - resource-types.xml not found
2020-04-17 22:39:09,383 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils - Unable to find ‘resource-types.xml’.
2020-04-17 22:39:09,451 INFO org.apache.flink.yarn.YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2020-04-17 22:39:11,137 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:11,582 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:11,724 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:12,560 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:13,105 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:13,177 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:13,568 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:14,824 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:17,637 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:18,604 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-17 22:39:20,693 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1592302610578_0007
2020-04-17 22:39:20,939 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1592302610578_0007
2020-04-17 22:39:20,940 INFO org.apache.flink.yarn.YarnClusterDescriptor - Waiting for the cluster to be allocated
2020-04-17 22:39:21,183 INFO org.apache.flink.yarn.YarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2020-04-17 22:39:41,121 INFO org.apache.flink.yarn.YarnClusterDescriptor - YARN application has been deployed successfully.
2020-04-17 22:39:41,127 INFO org.apache.flink.yarn.YarnClusterDescriptor - Found Web Interface ipsnode3:59460 of application ‘application_1592302610578_0007’.
Job has been submitted with JobID 0c96066f2c453e1da5613f2fc7f1e5d5
^C[root@ipsnode1 flink-1.10.1]#
Kafka发送数据:
[root@ipsnode2 bin]# ./kafka-console-producer.sh --broker-list ipsnode1:9092 -topic IPS_CASH
>aaa 100
>aaa 10
>aaa 1
>bbb 200
>bbb 22
>ccc 333
MySQL验证:
mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from test1;
±-----±-----+
| ky | vl |
±-----±-----+
| aaa | 100 |
| aaa | 110 |
| aaa | 111 |
| bbb | 200 |
| bbb | 222 |
| ccc | 333 |
±-----±-----+
6 rows in set (0.00 sec)
mysql>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。