赞
踩
<!--Flink web ui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
nc -l -p 8888
本地UI界面
Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序
* 运行时由两种类型的进程组成
* 一个 JobManager
* 一个或者多个 TaskManager
JobManager进程由三个不同的组件组
ResourceManager
负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots
Dispatcher
提供了一个 REST 接口,用来提交 Flink 应用程序执行
为每个提交的作业启动一个新的 JobMaster。
运行 Flink WebUI 用来提供作业执行信息
JobMaster
负责管理单个JobGraph的执行,Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster
至少有一个 JobManager,高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby
TaskManager中 task slot 的数量表示并发处理 task 的数量
一个 task slot 中可以执行多个算子,里面多个线程
算子 opetator
source
transformation
sink
对于分布式执行,Flink 将算子的 subtasks _链接_成 tasks,每个 task 由一个线程执行
图中source和map算子组成一个算子链,作为一个task运行在一个线程上
将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
Task Slots 任务槽
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/concepts/glossary/
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
和外部系统进行读取写入的
设置不同的并行度
package cn.mesmile.flink.demo; import cn.mesmile.flink.jdkstream.VideoOrder; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author zb * @date 2022/8/21 16:56 * @Description */ public class FlinkCustomSourceDemo04 { public static void main(String[] args) throws Exception { // 构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建本地 UI 界面操作 127.0.0.1:8081 final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(2); // 本机默认并行数为 12 ---> 本机配置为 6 核 12 线程 VideoOrderSource videoOrderSource = new VideoOrderSource(); DataStream<VideoOrder> videoOrderDataStream = env.addSource(videoOrderSource); videoOrderDataStream.filter(new FilterFunction<VideoOrder>() { @Override public boolean filter(VideoOrder value) throws Exception { return value.getMoney() > 5; } }).setParallelism(3); videoOrderDataStream.print().setParallelism(4); //DataStream需要调用execute,可以取个名称 env.execute("custom source job"); } }
Windows are at the heart of processing infinite streams(Window是处理无限数据量的核心)
数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等
Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算
窗口认为是Bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶
time Window 时间窗口,即按照一定的时间规则作为窗口统计
time-tumbling-window 时间滚动窗口 (用的多)
time-sliding-window 时间滑动窗口 (用的多)
session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用
count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用
滑动窗口 Sliding Windows
定义了要对窗口中收集的数据做的计算操作
增量聚合函数
aggregate(agg函数,WindowFunction(){ })
全窗口函数
apply(new processWindowFunction(){ })
如果想处理每个元素更底层的API的时候用
//对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter
process(new KeyedProcessFunction(){processElement、onTimer}
定义了要对窗口中收集的数据做的计算操作
增量聚合函数
aggregate(agg函数,WindowFunction(){ })
apply(new WindowFunction(){ })
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
WindowFunction<IN, OUT, KEY, W extends Window>
全窗口函数
process(new ProcessWindowFunction(){})
窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
ProcessWindowFunction<IN, OUT, KEY, W extends Window>
窗口函数对比
做了一个电商平台买 “超短男装衣服”,如果要统计10分钟内成交额,你认为是哪个时间比较好?
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,10 // 触发 13 - 3 ≥ 10
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:23,10 // 触发 23 -3 ≥ 20
Flink 最后的兜底延迟数据处理 测输出流实战
简介: Flink乱序延迟时间处理-多层保证措施介绍和归纳
新接口,WatermarkStrategy
,TimestampAssigner
和 WatermarkGenerator
因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式
新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了
Flink的状态State介绍和应用场景解析
有状态和无状态介绍
状态管理分类
State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)
State状态后端:存储在哪里
Flink 内置了以下这些开箱即用的 state backends :
状态详解
RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)
配置
flink-conf.yaml
使用配置键在 中配置默认状态后端state.backend
。#全局配置例子一
state.backend: hashmap
state.checkpoint-storage: jobmanager
#全局配置例子二
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem
//代码配置一(基于内存) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); //代码配置二(基于磁盘) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); //或者 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir")); - 备注:使用 RocksDBStateBackend 需要加依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> <version>1.13.1</version> </dependency>
Flink的Checkpoint-SavePoint和端到端状态一致性介绍
//全局配置checkpoints
state.checkpoints.dir: hdfs:///checkpoints/
//作业单独配置checkpoints
env.getCheckpointConfig().setCheckpointStorage(“hdfs:///checkpoints-data/”);
//全局配置savepoint
state.savepoints.dir: hdfs:///flink/savepoints
Savepoint 与 Checkpoint 的不同之处
端到端(end-to-end)状态一致性
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
DataStream<Event> input = ... Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.getId() == 42; } } ).next("middle").subtype(SubEvent.class).where( new SimpleCondition<SubEvent>() { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).followedBy("end").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.getName().equals("end"); } } ); PatternStream<Event> patternStream = CEP.pattern(input, pattern); DataStream<Alert> result = patternStream.process( new PatternProcessFunction<Event, Alert>() { @Override public void processMatch( Map<String, List<Event>> pattern, Context ctx, Collector<Alert> out) throws Exception { out.collect(createAlertFrom(pattern)); } }); - CEP并不包含在flink中,使用前需要自己导入 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.version}</artifactId> <version>${flink.version}</version> </dependency>
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
* 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/overview/
* Local 本地部署,直接启动进程,适合调试使用
* 直接部署启动服务
* Standalone Cluster集群部署,flink自带集群模式
* Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
* Kubernetes 部署
* Docker部署
* Flink下载地址
* https://flink.apache.org/zh/downloads.html
* flink版本 1.13.1(课程安装包那边有提供)
* 步骤
* 解压 tar -zxvf
* 目录介绍
* conf
* flink-conf.yaml
#web ui 端口 rest.port=8081 #调整 jobmanager.memory.process.size: 1000m taskmanager.memory.process.size: 1000m * bin * start-cluster.sh * stop-cluster.sh * yarn-session.sh * example * 启动 bin/start-cluster.sh * 停止 bin/stop-cluster.sh * 查看进程 jps * TaskManagerRunner * StandaloneSessionClusterEntrypoint * 网络安全组或者防火墙开放端口 8081 * 访问地址 http://ip:8081
flink测试官方案例
cd /usr/local/software/flink/examples/source
vim xdclass_source.txt
java xdclass
springboot springcloud
html flink
springboot redis
java flink
kafka flink
java springboot
./flink run /usr/local/software/flink/examples/batch/WordCount.jar
--input /usr/local/software/flink/examples/source/xdclass_source.txt
--output /usr/local/software/flink/examples/source/xdclass_result.txt
<build> <finalName>xdclass-flink</finalName> <plugins> <!--默认编译版本比较低,所以用compiler插件,指定项目源码的jdk版本,编译后的jdk版本和编码,--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>${file.encoding}</encoding> </configuration> </plugin> <!-- 添加依赖到jar包 --> <!--<plugin>--> <!--<artifactId>maven-assembly-plugin</artifactId>--> <!--<configuration>--> <!--<descriptorRefs>--> <!--<descriptorRef>jar-with-dependencies</descriptorRef>--> <!--</descriptorRefs>--> <!--</configuration>--> <!--<executions>--> <!--<execution>--> <!--<id>make-assembly</id>--> <!--<phase>package</phase>--> <!--<goals>--> <!--<goal>single</goal>--> <!--</goals>--> <!--</execution>--> <!--</executions>--> <!--</plugin>--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
运行
通过WebUI部署Flink项目到阿里云Linux运行
* 访问WebUI
* 上传jar包
* 选择main入口类APP
* 提交任务查看情况
* Task Solt 是指taskmanager的并发执行能力,parallelism是指taskmanager实际使用的并发能力
taskmanager.numberOfTaskSlots:4
假如每一个taskmanager中的分配4个TaskSlot,
那有3个taskmanager一共有12个TaskSlot
AA,2022-11-11 12:01:01,-1
BB,2022-11-11 12:01:02,1
AA,2022-11-11 12:01:04,-1
AA,2022-11-11 12:01:05,-1
version: "3.7" services: jobmanager: image: flink:scala_2.12-java8 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:scala_2.12-java8 depends_on: - jobmanager command: taskmanager scale: 3 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2
每个 manage 有 2 个 slot 所以最大并行度为 3(个manage)* 2(个slot) =6
version: "3.7" services: flink-jobmanager-01: image: flink:scala_2.12-java8 container_name: flink-jobmanager-01 hostname: flink-jobmanager-01 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01 flink-taskmanager-01: image: flink:scala_2.12-java8 container_name: flink-taskmanager-01 hostname: flink-taskmanager-01 expose: - "6121" - "6122" depends_on: - flink-jobmanager-01 command: taskmanager links: - "flink-jobmanager-01:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01 flink-taskmanager-02: image: flink:scala_2.12-java8 container_name: flink-taskmanager-02 hostname: flink-taskmanager-02 expose: - "6121" - "6122" depends_on: - flink-jobmanager-01 command: taskmanager links: - "flink-jobmanager-01:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager-01
端口说明
The Web Client is on port 8081
JobManager RPC port 6123
TaskManagers RPC port 6122
TaskManagers Data port 6121
注意:
expose暴露容器给link到当前容器的容器
ports是暴露容器端口到宿主机端口进行映
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。