赞
踩
引入pom.xml
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <properties>
- <flink.version>1.13.0</flink.version>
- <java.version>1.8</java.version>
- <scala.binary.version>2.12</scala.binary.version>
- <slf4j.version>1.7.30</slf4j.version>
- </properties>
- <dependencies>
- <!-- 引入 Flink 相关依赖-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-cep_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- 引入日志管理相关依赖-->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-to-slf4j</artifactId>
- <version>2.14.0</version>
- </dependency>
- </dependencies>
log4j.properties
- log4j.rootLogger=ERROR, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- nihao nihao
- jiushi nihao
- public class FlinkSoctet {
- public static void main(String[] args) throws Exception {
- //得到执行环境对象
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSource<String> initData = env.readTextFile("input/wc.txt");
-
- //使用内部内的优点是不用考虑类型擦除的问题
- MapOperator<String, Tuple2<String, Integer>> mapValue = initData.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String textItem, Collector<String> out) throws Exception {
- String[] resItem = textItem.split(" ");
- for (String s : resItem) {
- out.collect(s);
- }
- }
- }).map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String item) throws Exception {
- return Tuple2.of(item, 1);
- }
- });
-
- //mapValue得到的数据是(key,value)元组类型,0表示key的位置,1表示value的位置
- //下面就是用key进行分组,用value进行求和
- mapValue.groupBy(0).sum(1).print();
-
- }
- }
- (nihao,3)
- (jiushi,1)
- nihao nihao
- jiushi nihao
- public class FlinkSoctet {
- public static void main(String[] args) throws Exception {
- //得到执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> initData = env.readTextFile("input/wc.txt");
-
- SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String item, Collector<String> out) throws Exception {
- String[] resItem = item.split(" ");
- for (String s : resItem) {
- out.collect(s);
- }
- }
- }).map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String item) throws Exception {
- return Tuple2.of(item, 1);
- }
- });
-
- //对于得到的元组的流数据,进行分组聚合
- map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }).sum(1).print();
-
- //由于是流处理程序,所以这里要不断的执行
- env.execute();
- }
- }
前面的数字是分配到的Task的编号,可以看到key相同的数据到了一个Task里面执行,比如nihao都在1线程里面处理
- 2> (jiushi,1)
- 1> (nihao,1)
- 1> (nihao,2)
- 1> (nihao,3)
在发送数据的linux上面执行
nc -lk 9997
- public class FlinkSoctet {
- public static void main(String[] args) throws Exception {
- //得到执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> initData = env.socketTextStream("master",9997);
-
- SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String item, Collector<String> out) throws Exception {
- String[] resItem = item.split(" ");
- for (String s : resItem) {
- out.collect(s);
- }
- }
- }).map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String item) throws Exception {
- return Tuple2.of(item, 1);
- }
- });
-
- //对于得到的元组的流数据,进行分组聚合
- map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }).sum(1).print();
-
- //由于是流处理程序,所以这里要不断的执行
- env.execute();
- }
- }
输入数据
q v d s fd a q
- 9> (d,1)
- 8> (fd,1)
- 4> (q,1)
- 9> (s,1)
- 4> (q,2)
- 11> (a,1)
- 3> (v,1)
集群规划
master | node1 | node2 |
Jobmanager,TaskManager | TaskManager | TaskManager |
解压
tar -zxvf flink-1.13.2-bin-scala_2.12.tgz
修改配置文件
flink-conf.yaml
jobmanager.rpc.address: master
masters
master:8081
workers
- master
- node1
- node2
分发到集群其他机器
./xsync /home/bigdata/congxueflink
启动集群(在bin目录里面)
./start-cluster.sh
关闭
./stop-cluster.sh
加入打包插件
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
编写一个监听socket的程序
- public class FlinkSoctet {
- public static void main(String[] args) throws Exception {
- //得到执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> initData = env.socketTextStream("master",9997);
-
- SingleOutputStreamOperator<Tuple2<String, Integer>> map = initData.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String item, Collector<String> out) throws Exception {
- String[] resItem = item.split(" ");
- for (String s : resItem) {
- out.collect(s);
- }
- }
- }).map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String item) throws Exception {
- return Tuple2.of(item, 1);
- }
- });
-
- //对于得到的元组的流数据,进行分组聚合
- map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }).sum(1).print();
-
- //由于是流处理程序,所以这里要不断的执行
- env.execute();
- }
- }
然后打包
提交成功并执行
查看处理的数据
-m是jobmanager开放提交的地址
-c是提交jar的启动类
-p是并行度
最后的jar就是自己打包的jar
./flink run -m master:8081 -c com.chongxue.flink.FlinkSoctet -p 2 flink-1.0-SNAPSHOT.jar
停止任务(后面的是job的id)
./flink cancel d9e4f4dcb0516551d6611675c16113bb
配置环境变量
sudo vi /etc/profile.d/my_env.sh
- HADOOP_HOME=/home/bigdata/hadoop/hadoop
- export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
- export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
- export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile.d/my_env.sh
可选配置:更具自己的需求配置启动的资源分配
vim flink-conf.yaml
- jobmanager.memory.process.size: 1600m
- taskmanager.memory.process.size: 1728m
- taskmanager.numberOfTaskSlots: 8
- parallelism.default: 1
./yarn-session.sh -nm test
可用参数解读:
⚫ -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,
即使关掉当前对话窗口,YARN session 也可以后台运行。
⚫ -jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
⚫ -nm(--name):配置在 YARN UI 界面上显示的任务名。
⚫ -qu(--queue):指定 YARN 队列名。
⚫ -tm(--taskManager):配置每个 TaskManager 所使用内存。
注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,
YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也
不会把集群资源固定,同样是动态分配的。
YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,
用户可以通过 web UI 或者命令行两种方式提交作业。
访问http://node1:34982/#/overview
如下图
提交一个job(node1:34982是刚才访问WebUI的地址和端口)
./flink run -m node1:34982 -c com.chongxue.flink.FlinkSoctet -p 2 flink-1.0-SNAPSHOT.jar
(每一次提交启动一个完整的集群)
启动
./flink run -d -t yarn-per-job -c com.chongxue.flink.FlinkSoctet flink-1.0-SNAPSHOT.jar
停止job
$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY<jobId>这里的 application_XXXX_YY 是当前应用的 ID , <jobId> 是作业的 ID 。注意如果取消作业,整个 Flink 集群也会停掉。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。