赞
踩
Apache Flink 是一个分布式大数据处理引擎,不负责存储,可对有限数据流和无限数据流进行有状态计算。可部署在各种集群环境,对各种大小的数据规模进行快速计算。
一、flink介绍
1、特点
批流统一
支持高吞吐、低延迟、高性能的流处
支持带有事件时间的窗口(Window)操作
支持有状态计算的Exactly-once语义
支持高度灵活的窗口(Window)操作,支持基于time、count、session窗口操作
支持具有Backpressure功能的持续流模型
支持基于轻量级分布式快照(Snapshot)实现的容错
支持迭代计算
Flink在JVM内部实现了自己的内存管理
支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
2、和别的框架对比
3、flink角色
JobManager:
也称之为Master,用于协调分布式执行,它用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master,如果配置高可用模式则会存在多个master,它们其中有一个是leader,而其他的都是standby。
TaskManager:
也称之为Worker,用于执行一个dataflow的task、数据缓冲和Data Streams的数据交换,Flink运行时至少会存在一个TaskManager。JobManager和TaskManager可以直接运行在物理机上,或者运行YARN这样的资源调度框架,TaskManager通过网络连接到JobManager,通过RPC通信告知自身的可用性进而获得任务分配。
4、模式
local模式、standalone模式、onyarn模式
二、flink安装
1、上传解压安装包
下载地址:https://flink.apache.org/downloads.html
tar -xvf flink-1.9.1-bin-scala_2.11.tgz -C /bigdata/
2、修改配置文件
vim flink-conf.yaml
#指定jobmanager的地址
jobmanager.rpc.address: hadoop01
#指定taskmanager的可用槽位的数量
taskmanager.numberOfTaskSlots: 2
vim slaves
hadoop02
hadoop03
3、拷贝文件
scp -r /bigdata/flink-1.9.1/ hadoop02:/bigdata/flink-1.9.1/
scp -r /bigdata/flink-1.9.1/ hadoop03:/bigdata/flink-1.9.1/
4、启动
bin/start-cluster.sh
5、访问
http://192.168.1.103:8081/#/overview
4、模板初始化
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.1 \
-DgroupId=dashuju.flink \
-DartifactId=flink-java \
-Dversion=1.0 \
-Dpackage=dashuju.flink \
-DinteractiveMode=false
② 或者在命令行中执行下面的命令,需要有网络
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.1
三、java版本wordcount(匿名内部类)
package cn._51doit.flink.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建一个flink stream 程序的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用StreamExecutionEnvironment创建DataStream
//Source
DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));
//Transformation开始
//调用DataStream上的方法Transformation(s)
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
//切分
String[] words = line.split(" ");
for (String word : words) {
//输出
out.collect(word);
}
}
});
//将单词和一组合
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);
//Transformation结束
// 调用Sink (Sink必须调用)
summed.print();
//启动
env.execute("StreamWordCount");
}
}
四、java版本wordcount(Lambda)
package cn._51doit.flink.day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class LambdaStreamWordCount {
public static void main(String[] args) throws Exception {
//创建一个flink stream 程序的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用StreamExecutionEnvironment创建DataStream
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用DataStream上的方法Transformation(s)
SingleOutputStreamOperator<String> word = lines.flatMap((String line, Collector<String> out) ->
Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING);
//将单词和一组合
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = word.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);
// 调用Sink (Sink必须调用)
summed.print();
//执行
env.execute("LambdaStreamWordCount");
}
}
五、scala版本wordcount
package cn._51doit.flink.day01
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
//创建flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//flink通过env创建抽象的数据集(Datastream)
//Source
val lines: DataStream[String] = env.socketTextStream("localhost", 8888)
//Transformation开始
//切分压平,需要导入隐式转换
val words: DataStream[String] = lines.flatMap(_.split(" "))
//将单词和1组合
val wordAndOne: DataStream[(String, Int)] = words.map((_, 1))
//分组聚合
val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
//聚合
val summed: DataStream[(String, Int)] = keyed.sum(1)
//Transformation结束
//调用Sink
summed.print()
//执行
env.execute("StreamWordCount")
}
}
六、提交任务
方式一:web提交
方式二:命令提交
/bigdata/flink-1.9.1/bin/flink run -m hadoop01:8081 -p 4 -c cn._51doit.flink.StreamingWordCount /root/hello-flink-java-1.0.jar hadoop01 8888
参数说明:
-m指定主机名后面的端口为JobManager的REST的端口,而不是RPC的端口,RPC通信端口是6123
-p 指定是并行度
-c 指定main方法的全类名
7、任务终止
通过saveCheckpoint,重新提交
或者使用代码
//任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。