赞
踩
事件驱动型应用
Flink的分层API
Flink应用程序支持批和流式处理分析
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class WordCountBatch { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataSet<String> inputDataSet = env.readTextFile("src/main/resources/a.txt"); // 分词,平化,分组,合计 DataSet<Tuple2<String, Integer>> wordCountDataSet = inputDataSet.flatMap(new MyFlatMap()) .groupBy(0) // 对Tuple2的0号位置进行分组 .sum(1); // 对Tuple2的1号位置进行求和 // 打印输出 wordCountDataSet.print(); } }
打印结果
(dd,3)
(aa,4)
(bb,1)
(cc,2)
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class WordCountStream { public static void main(String[] args) throws Exception { // 流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataStream<String> inputDataStream = env.readTextFile("src/main/resources/a.txt"); // 分词,平化,分组,合计 DataStream<Tuple2<String, Integer>> wordCountDataStream = inputDataStream .flatMap(new MyFlatMap()) .keyBy(0) .sum(1); // 打印,设置并行度 wordCountDataStream.print().setParallelism(1); // 执行 env.execute(); } }
打印结果
(dd,1)
(cc,1)
(dd,2)
(cc,2)
(dd,3)
(aa,1)
(aa,2)
(aa,3)
(aa,4)
(bb,1)
虚拟机输入命令
nc -lk 7777
运行下面Java代码(只改了第10行)
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class WordCountStreamSocket { public static void main(String[] args) throws Exception { // 流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从网络中读取数据 DataStream<String> inputDataStream = env.socketTextStream("hadoop100", 7777); // 分词,平化,分组,合计 DataStream<Tuple2<String, Integer>> wordCountDataStream = inputDataStream .flatMap(new MyFlatMap()) .keyBy(0) .sum(1); // 打印,设置并行度 wordCountDataStream.print().setParallelism(1); // 执行 env.execute(); } }
打印结果
https://archive.apache.org/dist/flink/
tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C $B_HOME/
cd $B_HOME
mv flink-1.10.1 flink
https://yellow520.blog.csdn.net/article/details/112692486
export FLINK_HOME=$B_HOME/flink
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class WordCount { public static void main(String[] args) throws Exception { // 流执行环境 StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment(); // 从网络中读取数据 DataStream<String> inputDataStream = e.socketTextStream("localhost", 7777); // 分词,平化,分组,合计 DataStream<Tuple2<String, Integer>> ds; ds = inputDataStream.flatMap(new MyFlatMap()).keyBy(0).sum(1); // 打印,设置并行度 ds.print(); // 执行 e.execute(); } }
nc -lk 7777
把Flink的Hadoop包放到lib
下
cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib
ll $FLINK_HOME/lib
启动Hadoop
hadoop.py start
Flink的YARN模式架构图
在YARN中初始化一个Flink集群,开辟指定的资源,以后提交任务都向这里提交。
这个flink集群会常驻在YARN集群中
$FLINK_HOME/bin/yarn-session.sh \
-s 2 \
-jm 1024 \
-tm 1024 \
-nm a1 \
-d
参数 | 说明 |
---|---|
-s (--slots ) | 每个TaskManager的slot数量 |
-jm | JobManager的内存(单位MB) |
-tm | 每个TaskManager的内存(单位MB) |
-nm | YARN上应用程序名字 |
-d | 后台执行 |
$FLINK_HOME/bin/flink run -c WordCount FlinkPractise-1.0-SNAPSHOT.jar
结果
如图示5个单词,Records
也是5
yarn application --list
yarn application --kill application_1626235724262_0001
每次提交都会创建一个新的Flink集群,任务之间互相独立
任务执行完成之后创建的集群也会消失
多了个-m yarn-cluster
$FLINK_HOME/bin/flink run \
-m yarn-cluster \
-c WordCount \
FlinkPractise-1.0-SNAPSHOT.jar
en | 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/900848 推荐阅读 相关标签 Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。 |
---|