赞
踩
需求:写一个文本,统计出单词的个数
1.使用flink批处理
查看WordCountBatchDemo类
2.使用flink流处理
查看WordCountStreamDemo类
3.读取socket文本流,统计单词个数
查看WordCountStreamUnboundedDemo类
代码地址:学习Flink1.17代码仓库: 学习Flink1.17代码的所有代码
1.集群规划
2.解压安装包
1.解压
[atguigu@hadoop102 software]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
2.修改配置文件
[atguigu@hadoop102 conf]$ vim flink-conf.yaml
# JobManager节点地址. jobmanager.rpc.address: hadoop102 jobmanager.bind-host: 0.0.0.0 rest.address: hadoop102 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.bind-host: 0.0.0.0 taskmanager.host: hadoop102
[atguigu@hadoop102 conf]$ vim workers
hadoop102 hadoop103 hadoop104
[atguigu@hadoop102 conf]$ vim masters
hadoop102:8081
3.分发
[atguigu@hadoop102 module]$ xsync flink-1.17.0/
4.修改分发后的配置
在hadoop103上修改:
[atguigu@hadoop103 conf]$ vim flink-conf.yaml
# TaskManager节点地址.需要配置为当前机器名 taskmanager.host: hadoop103
在hadoop104上修改
[atguigu@hadoop104 conf]$ vim flink-conf.yaml
# TaskManager节点地址.需要配置为当前机器名 taskmanager.host: hadoop104
5.启动
[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
[atguigu@hadoop102 flink-1.17.0]$ jpsall =============== hadoop102 =============== 4453 StandaloneSessionClusterEntrypoint 4458 TaskManagerRunner 4533 Jps =============== hadoop103 =============== 2872 TaskManagerRunner 2941 Jps =============== hadoop104 =============== 2948 Jps 2876 TaskManagerRunner
6.访问
1.将WordCount案例打包
代码地址:学习Flink1.17代码仓库: 学习Flink1.17代码的所有代码
在hadoop启动7777端口
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777 hello
2.向集群中上传jar包
3.运行
4.查看结果
5.关闭
先启动一个集群,保持会话,在想上面提交作业,例:1.2.3一样(向集群提交作业)
适合:单个规模小,执行时间短的大量作业
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业 ( Per-Job) 模式。
作业完成后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes (K8S)
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobMatager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。 所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManage,也就是创建一个集群。这个JobManager只为执行这一个应用而存在执行结束之后JobIManager也就关闭了,这就是所谓的应用模式。
应用模式与单作业模式,都是提交作业之后才创建集群,单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群,而应用模式下,是直接由JobIManaget执行应用程序的。
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
1.会话模式
2.单作业模式
3.应用模式
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
案例:
1.配置环境变量
vim /etc/profile.d/my_env.sh
HADOOP_HOME=/opt/module/hadoop-3.3.4 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=`hadoop classpath`
2.启动hadoop和启动端口
[atguigu@hadoop102 hadoop-3.3.4]$ start-dfs.sh [atguigu@hadoop103 hadoop-3.3.4]$ start-yarn.sh [atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
3.修改配置(hadoop102,hadoop103,hadoop104都需要修改)
[atguigu@hadoop102 conf]$ vim flink-conf.yaml classloader.check-leaked-classloader: false
4.上传jar包并运行(主要,上传的jar包的地址要写对哟)
/opt/module/flink-1.17.0/bin/flink run -d -t yarn-per-job -c com.wsjj.yjh.WordCountStreamUnboundedDemo /opt/module/flink-1.17.0/bin/FlinkTutorial-1.17-1.0-SNAPSHOT.jar
5.查看
6.关闭
1.会话模式
2.单作业模式
3.应用模式
1.hdfs中创建目录
hadoop fs -mkdir -p /logs/flink-job
2.添加配置(在hadoop102,hadoop103,hadoop104上都要添加)
[atguigu@hadoop102 conf]$ vim flink-conf.yaml
jobmanager.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job historyserver.web.address: hadoop102 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job historyserver.archive.fs.refresh-interval: 5000
3.启动
bin/historyserver.sh start
4.访问
5.关闭
bin/historyserver.sh stop
JobManager是一个Flink集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的JobManager所控制执行。
JobManger又包含3个不同的组件。
(1)JobMaster
JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。所以JobMaster和具体的Job是一一对应的,多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster。需要注意在早期版本的Flink中,没有JobMaster的概念;而JobManager的概念范围较小,实际指的就是现在所说的JobMaster。
在作业提交时,JobMaster会先接收到要执行的应用。JobMaster会把JobGraph转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。JobMaster会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
而在运行过程中,JobMaster会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
(2)资源管理器(ResourceManager)
ResourceManager主要负责资源的分配和管理,在Flink 集群中只有一个。所谓“资源”,主要是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。
这里注意要把Flink内置的ResourceManager和其他资源管理平台(比如YARN)的ResourceManager区分开。
(3)分发器(Dispatcher)
Dispatcher主要负责提供一个REST接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。
TaskManager是Flink中的工作进程,数据流的具体计算就是它来做的。Flink集群中必须至少有一个TaskManager;每一个TaskManager都包含了一定数量的任务槽(task slots)。Slot是资源调度的最小单位,slot的数量限制了TaskManager能够并行处理的任务数量。
启动之后,TaskManager会向资源管理器注册它的slots;收到资源管理器的指令后,TaskManager就会将一个或者多个槽位提供给JobMaster调用,JobMaster就可以分配任务来执行了。
在执行过程中,TaskManager可以缓冲数据,还可以跟其他运行同一应用的TaskManager交换数据。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism),一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
例如:如上图所示,当前数据流中有source、map、window、sink四个算子,其中sink算子的并行度为1,其他算子的并行度都为2。所以这段流处理程序的并行度就是2。
1.代码中设置
我们在代码中,可以很简单地在算子后跟着调用setParallelism()方法,来设置当前算子的并行度:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。
另外,我们也可以直接调用执行环境的setParallelism()方法,全局设定并行度:
env.setParallelism(2);
这样代码中所有算子,默认的并行度就都为2了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要注意的是,由于keyBy不是算子,所以无法对keyBy设置并行度。
2.提交应用时设置
在使用flink run命令提交应用时,可以增加-p参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount
./FlinkTutorial-1.0-SNAPSHOT.jar
如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度。
3.配置文件中设置
我们还可以直接在集群的配置文件flink-conf.yaml中直接更改默认并行度:
parallelism.default: 2
这个设置对于整个集群上提交的所有作业有效,初始值为1。无论在代码中设置、还是提交时的-p参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数。
1)算子间的数据传输
一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。
(1)一对一(One-to-one,forwarding)
这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map算子,source算子读取数据之后,可以直接发送给map算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。
(2)重分区(Redistributing)
在这种模式下,数据流的分区会发生改变。比如图中的map和后面的keyBy/window算子之间,以及keyBy/window算子和Sink算子之间,都是这样的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。
2)合并算子链
在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分,如下图所示。每个task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。
上图中Source和map之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有5个任务,由5个线程并行执行。
将算子链接成task是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
Flink默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:
// 禁用算子链 .map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链 .map(word -> Tuple2.of(word, 1L)).startNewChain()
每个任务槽(task slot)其实表示了TaskManager拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。
在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中,可以设置TaskManager的slot数量,默认是1个slot。
taskmanager.numberOfTaskSlots: 8
需要注意的是,slot目前仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。
任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是静态的概念
*1)逻辑流图(StreamGraph)*
这是根据用户通过 DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构。这一步一般在客户端完成。
*2)作业图(JobGraph)*
StreamGraph经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的,在作业提交时传递给JobMaster。
我们提交作业之后,打开Flink自带的Web UI,点击作业就能看到对应的作业图。
*3)执行图(ExecutionGraph)*
JobMaster收到JobGraph后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。
*4)物理图(Physical Graph)*
JobMaster生成执行图后,会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。
物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算了。
1)getExecutionEnvironment
最简单的方式,就是直接调用getExecutionEnvironment方法。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2)createLocalEnvironment
这个方法返回一个本地执行环境。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
这个方法返回集群执行环境。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // JobManager主机名 1234, // JobManager进程端口号 "path/to/jarFile.jar" // 提交给JobManager的JAR包 );
1. 流执行模式(Streaming)
// 流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.批执行模式(Batch)
3.自动模式(AutoMatic)
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
env.execute();
代码地址:学习Flink1.17代码仓库: 学习Flink1.17代码的所有代码
public class ListSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ArrayList<Integer> data1 = new ArrayList<>(); data1.add(1); data1.add(2); data1.add(3); List<Integer> data = Arrays.asList(1, 2, 3, 4); DataStreamSource<Integer> ds = env.fromCollection(data); ds.print(); env.execute(); } }
public class FileSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); org.apache.flink.connector.file.src.FileSource<String> fileSource = org.apache.flink.connector.file.src.FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/flink.txt")).build(); DataStreamSource<String> files = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "files"); SingleOutputStreamOperator<Tuple2<String, Long>> map = files.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] s = value.split(" "); for (String s1 : s) { return Tuple2.of(s1, 1L); } return null; } }); KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> value) throws Exception { return value.f0; } }); SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1); sum.print(); env.execute(); } }
public class SocketSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); source.print(); env.execute(); } }
public class KafkaSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); DataStreamSource<String> stringDataStreamSource = env.fromSource(KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092") .setTopics("topic_sink") .setGroupId("yjh") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(), WatermarkStrategy.noWatermarks(), "kafka-source"); stringDataStreamSource.print(); env.execute(); } }
public class DataGeneratorDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> datagenerator = env.fromSource(new DataGeneratorSource<String>( new GeneratorFunction<Long, String>() { @Override public String map(Long aLong) throws Exception { return "Number:" + aLong; } }, Long.MAX_VALUE, RateLimiterStrategy.perCheckpoint(10), Types.STRING), WatermarkStrategy.noWatermarks(), "datagenerator"); datagenerator.print(); env.execute(); } }
*1)Flink的类型系统*
Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
*2)Flink支持的数据类型*
对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:
(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。
(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型
l Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
l Scala 样例类及Scala元组:不支持空字段。
l 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
l POJO:Flink自定义的类似于Java bean模式的类。
(4)辅助类型
Option、Either、List、Map等。
(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
l 类是公有(public)的
l 有一个无参的构造方法
l 所有属性都是公有(public)的
l 所有属性的类型都是可以序列化的
*3)类型提示(Type Hints)*
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
准备工作:
@Data @NoArgsConstructor @AllArgsConstructor public class WaterSensor { public String id; public Long ts; public Integer vc; }
主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
案例:实现了提取WaterSensor中的id字段的功能。
public class TransMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> source = env.fromElements( new WaterSensor("sensor_1", 1L, 1), new WaterSensor("sensor_2", 2L, 2), new WaterSensor("sensor_3", 3L, 3) ); // 方式一:传入匿名类,实现MapFunction source.map(new MapFunction<WaterSensor, String>() { @Override public String map(WaterSensor value) throws Exception { return value.id; } }).print(); // 方式二:传入MapFunction的实现类 source.map(new UserMap()).print(); env.execute(); } private static class UserMap implements MapFunction<WaterSensor,String>{ @Override public String map(WaterSensor value) throws Exception { return value.id; } } }
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。
案例:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。
public class TransFilter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> source = env.fromElements( new WaterSensor("sensor_1", 1L, 1), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_2", 2L, 2), new WaterSensor("sensor_3", 3L, 3) ); // 方式一:传入匿名类实现FilterFunction source.filter(new FilterFunction<WaterSensor>() { @Override public boolean filter(WaterSensor value) throws Exception { return value.id.equals("sensor_1"); } }).print(); // 方式二:传入FilterFunction实现类 source.filter(new UserFilter()).print(); env.execute(); } private static class UserFilter implements FilterFunction<WaterSensor> { @Override public boolean filter(WaterSensor value) throws Exception { return value.id.equals("sensor_1"); } } }
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
案例:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。
//如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。 public class TransFlatmap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> source = env.fromElements( new WaterSensor("sensor_1", 1L, 1), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_2", 2L, 2), new WaterSensor("sensor_3", 3L, 3) ); source.flatMap(new FlatMapFunction<WaterSensor, Long>() { @Override public void flatMap(WaterSensor value, Collector<Long> out) throws Exception { String id = value.getId(); if(id.equals("sensor_1")){ out.collect(value.getVc().longValue()); // out.collect(Long.valueOf(value.getVc())); } if(id.equals("sensor_2")){ out.collect(value.getVc().longValue()); out.collect(value.getTs()); } } }).print(); env.execute(); } }
对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。
在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。
public class TransKeyBy { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> source = env.fromElements( new WaterSensor("sensor_1", 1L, 1), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_2", 2L, 2), new WaterSensor("sensor_3", 3L, 3) ); // 方式一:使用匿名类实现KeySelector source.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.id; } }).print(); // 方式二:使用Lambda表达式 source.keyBy(value -> value.id).print(); env.execute(); } }
sum():在输入流上,对指定的字段做叠加求和的操作。
min():在输入流上,对指定的字段求最小值。
max():在输入流上,对指定的字段求最大值。
minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
public class TransAggregation { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> source = env.fromElements( new WaterSensor("sensor_1", 1L, 1), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_2", 2L, 2), new WaterSensor("sensor_3", 3L, 3) ); KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = source.keyBy(value -> value.id); // waterSensorStringKeyedStream.sum("vc").print(); // waterSensorStringKeyedStream.max("vc").print(); waterSensorStringKeyedStream.maxBy("vc").print(); env.execute(); } }
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
案例:使用reduce实现max和maxBy的功能。
// 使用reduce实现max和maxBy的功能。 //需要在linux中打开端口 命令为: nc -lk 7777 public class TransReduce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("hadoop102", 7777) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); } }); KeyedStream<WaterSensor, String> keyedStream = map.keyBy(value -> value.id); //max // keyedStream.reduce(new ReduceFunction<WaterSensor>() { // @Override // public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { // Integer vc = value1.getVc(); // long l = value1.getTs() + value2.getTs(); // value1.setVc(vc); // value1.setTs(l); // return value1; // } // }).print(); //maxBy keyedStream.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { Integer vc = value2.getVc(); long l = value1.getTs() + value2.getTs(); value1.setVc(vc); value1.setTs(l); return value1; } }).print(); env.execute(); } }
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
public class TransFilter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> source = env.fromElements( new WaterSensor("sensor_1", 1L, 1), new WaterSensor("sensor_1", 2L, 2), new WaterSensor("sensor_2", 2L, 2), new WaterSensor("sensor_3", 3L, 3) ); // 方式二:传入FilterFunction实现类 source.filter(new UserFilter()).print(); env.execute(); } private static class UserFilter implements FilterFunction<WaterSensor> { @Override public boolean filter(WaterSensor value) throws Exception { return value.id.equals("sensor_1"); } } }
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。
public class RichFunctionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4); source.map(new RichMapFunction<Integer, Integer>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始"); } @Override public Integer map(Integer value) throws Exception { return value+1; } @Override public void close() throws Exception { super.close(); System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束"); } }).print(); env.execute(); } }
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。
经过随机分区之后,得到的依然是一个DataStream。
public class ShuffleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); KeyedStream<String, String> stringStringKeyedStream = source.keyBy(value -> value); DataStream<String> shuffle = source.shuffle(); stringStringKeyedStream.print("key"); shuffle.print("shuffle"); env.execute(); } }
轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
stream.rebalance()
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
stream.rescale()
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
stream.broadcast()
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
stream.global()
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
public class PartitionCustomDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); source.partitionCustom(new Partitioner<String>() { @Override public int partition(String key, int numPartitions) { return Integer.parseInt(key)%numPartitions; } }, new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }).print(); env.execute(); } }
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了
案例:读取一个整数数字流,将数据流划分为奇数流和偶数流。
// 读取一个整数数字流,将数据流划分为奇数流和偶数流。 public class SplitStreamByFilter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.fromSource(KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092") .setGroupId("jajdk") .setTopics("topic_sink") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(),WatermarkStrategy.noWatermarks(), "kafkasource"); source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return Integer.parseInt(value) % 2 == 0; } }).print("偶数"); source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return Integer.parseInt(value) % 2 ==1; } }).print("基数"); env.execute(); } }
缺点:是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。
需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。
案例:将id等于s1和s2的单独输出
//侧输出流 public class SplitStreamByOutputTag { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.fromSource(KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092") .setTopics("topic_sink") .setGroupId("yjh") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(), WatermarkStrategy.noWatermarks(), "kafasource"); SingleOutputStreamOperator<WaterSensor> map = source.map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); } }); OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)); OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)); SingleOutputStreamOperator<WaterSensor> process = map.process(new ProcessFunction<WaterSensor, WaterSensor>() { @Override public void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception { if ("s1".equals(value.id)) { ctx.output(s1, value); } else if ("s2".equals(value.id)) { ctx.output(s2, value); } else { out.collect(value); } } }); SideOutputDataStream<WaterSensor> s1Output = process.getSideOutput(s1); SideOutputDataStream<WaterSensor> s2Output = process.getSideOutput(s2); s1Output.print("s1"); s2Output.print("s2"); process.print(); env.execute(); } }
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
public class UnionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3); DataStreamSource<Integer> ds2 = env.fromElements(10, 20, 30); DataStreamSource<String> ds3 = env.fromElements("4", "5", "8"); DataStream<Integer> union = ds1.union(ds2, ds3.map(Integer::valueOf)); union.print(); env.execute(); } }
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。
public class ConnectDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> ds1 = env.fromSource(KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092") .setGroupId("yjh") .setTopics("topic_sink") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(), WatermarkStrategy.noWatermarks(), "kafkasource"); // ds1.print("ds1"); DataStreamSource<Integer> ds2 = env.fromElements(1, 2, 3, 5); ConnectedStreams<String, Integer> connect = ds1.connect(ds2); connect.map(new CoMapFunction<String, Integer, String>() { @Override public String map1(String value) throws Exception { return value; } @Override public String map2(Integer value) throws Exception { return value.toString(); } }).print(); env.execute(); } }
案例:连接两条流,输出能根据id匹配上的数据(类似inner join效果)
public class ConnectKeybyDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements( Tuple2.of(1, "a1"), Tuple2.of(1, "a2"), Tuple2.of(2, "b"), Tuple2.of(3, "c") ); DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements( Tuple3.of(1, "aa1", 1), Tuple3.of(1, "aa2", 2), Tuple3.of(2, "bb", 1), Tuple3.of(3, "cc", 1) ); ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2); ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> tuple2Tuple3ConnectedStreams = connect.keyBy(new KeySelector<Tuple2<Integer, String>, Integer>() { @Override public Integer getKey(Tuple2<Integer, String> value) throws Exception { return value.f0; } }, new KeySelector<Tuple3<Integer, String, Integer>, Integer>() { @Override public Integer getKey(Tuple3<Integer, String, Integer> value) throws Exception { return value.f0; } }); tuple2Tuple3ConnectedStreams.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() { Map<Integer, List<Tuple2<Integer,String>>> cache1 = new HashMap<>(); Map<Integer,List<Tuple3<Integer,String,Integer>>> cache2 = new HashMap<>(); @Override public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception { Integer f0 = value.f0; if(!cache1.containsKey(f0)){ List<Tuple2<Integer, String>> tuple1s = new ArrayList<>(); tuple1s.add(value); cache1.put(f0,tuple1s); }else { cache1.get(f0).add(value); } if(cache2.containsKey(f0)){ for (Tuple3<Integer, String, Integer> integerStringIntegerTuple3 : cache2.get(f0)) { out.collect("s1:" + value + "<--------->s2:" + cache2); } } } @Override public void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception { Integer f0 = value.f0; if(!cache2.containsKey(f0)){ ArrayList<Tuple3<Integer, String, Integer>> tuple3s = new ArrayList<>(); tuple3s.add(value); cache2.put(f0,tuple3s); }else { cache2.get(f0).add(value); } if(cache1.containsKey(f0)){ for (Tuple2<Integer, String> integerStringTuple2 : cache1.get(f0)) { out.collect("s2:" + value + "<--------->s1:" + cache1); } } } }).print(); env.execute(); } }
行编码: FileSink.forRowFormat(basePath,rowEncoder)。(行式去写)
批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。(列式去写)
public class SinkFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); // 数据生成器读取数据 DataStreamSource<String> source = env.fromSource(new DataGeneratorSource<String>(new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return null; } }, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1000), Types.STRING), WatermarkStrategy.noWatermarks(), "data-gensjkljads"); FileSink<String> ouput = FileSink.forRowFormat(new Path("ouput/fink.txt"), new SimpleStringEncoder<String>("UTF-8")) .withOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("atguigu-") .withPartPrefix(".log") .build()) .withBucketAssigner(new DateTimeBucketAssigner<String>("yyyy-MM-dd HH", ZoneId.systemDefault())) .withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)) .withMaxPartSize(new MemorySize(1024 * 1024)).build()) .build(); source.sinkTo(ouput); env.execute(); } }
public class SinkKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); KafkaSink<String> topic_sink = KafkaSink.<String>builder() // 设置kafka链接地址 .setBootstrapServers("hadoop102:9092,hadoop103:9092") // 反序列化 .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder() .setTopic("topic_sink") //topic名称 .setValueSerializationSchema(new SimpleStringSchema()) //反序列化方式(string) .build()) // 写到kafka的一致性级别: 精准一次、至少一次 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 如果是精准一次,必须设置 事务的前缀 .setTransactionalIdPrefix("yjh-") // 检查点链接 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "") .build(); source.sinkTo(topic_sink); env.execute(); } }
自定义序列化器,实现带key的record:
public class SinkKafkaWithKey { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.noRestart()); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); source.sinkTo(KafkaSink.<String>builder() // 链接kafka .setBootstrapServers("hadoop102:9092,hadoop103:9092") // 事务前缀 .setTransactionalIdPrefix("yjh-") // 精准消费 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 错误最大时长 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10 * 60 * 1000 + "") // 自定义序列化 .setRecordSerializer(new KafkaRecordSerializationSchema<String>() { @Nullable @Override public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) { String[] split = element.split(","); byte[] key = split[0].getBytes(StandardCharsets.UTF_8); byte[] value = element.getBytes(StandardCharsets.UTF_8); return new ProducerRecord<byte[], byte[]>("topic_sink",key,value); } }) .build()); env.execute(); } }
@Data @NoArgsConstructor @AllArgsConstructor public class WaterSensor { public String id; public Long ts; public Integer vc; }
public class SinkMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); } }); SinkFunction<WaterSensor> sink = JdbcSink.sink("insert into ws(id,ts,vc) values(?,?,?)", new JdbcStatementBuilder<WaterSensor>() { @Override public void accept(PreparedStatement preparedStatement, WaterSensor t) throws SQLException { preparedStatement.setString(1, t.getId()); preparedStatement.setLong(2, t.getTs()); preparedStatement.setInt(3, t.getVc()); } }, JdbcExecutionOptions.builder() .withMaxRetries(3) .withBatchSize(100) .withBatchIntervalMs(3000) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://1.94.41.70:3306/flink_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("000000") .withConnectionCheckTimeoutSeconds(60) .build()); map.addSink(sink); env.execute(); } }
stream.addSink(new MySinkFunction<String>());
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口。
使用场景:滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。 定义滑动窗口的参数有两个:除去窗口大小 (window size)之外,还有一个“滑动步长(window slide)它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率,当滑动步长小于窗口大小时,滑动窗口就会出现重叠这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide) 来决定 使用场景:滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(size = slide)滑动窗口适合计算结果更新频率非常高的场景
会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小 (size),那说明还在保持会话,它们就属于同一个窗口;如果ap大于size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。
会话窗口的长度不固定起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(sessiongap 在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。
“全局窗口”这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器(Trigger )
1.按键分区
stream.keyBy(...) .window(...)
2.非按键分区
stream.windowAll(...)
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)
1.滚动处理时间窗口
窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法.of()。
stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(...)
这里.of()方法需要传入一个Time类型的参数size,表示滚动窗口的大小,我们这里创建了一个长度为5秒的滚动窗口。
另外,.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。
2.滚动处理时间窗口
窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法.of()。
stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(...)
这里.of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。
3.处理时间会话窗口
窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。
stream.keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
这里.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。
另外,还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。
4.滚动事件时间窗口
窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。
stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)
5.滑动事件时间窗口
窗口分配器由类SlidingEventTimeWindows提供,用法与滑动处理事件窗口完全一致。
stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(...)
6.事件时间会话窗口
窗口分配器由类EventTimeSessionWindows提供,用法与处理事件会话窗口完全一致。
stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
(1)滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。
stream.keyBy(...) .countWindow(10)
我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口。
(2)滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...) .countWindow(10,3)
我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果。
3)全局窗口
全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供。
stream.keyBy(...) .window(GlobalWindows.create());
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
1.归约函数(ReduceFunction)
public class WindowReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); WaterSensor waterSensor = new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); return waterSensor; }); KeyedStream<WaterSensor, String> keyedStream = map.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.id; } }); SingleOutputStreamOperator<WaterSensor> reduce = keyedStream.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { return new WaterSensor(value1.id, System.currentTimeMillis(), value1.vc + value2.vc); } }); reduce.print(); env.execute(); } }
2.聚合函数(AggregateFunction)
public class WindowAggregateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }); KeyedStream<WaterSensor, String> stream = map.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.id; } }); WindowedStream<WaterSensor, String, TimeWindow> window = stream.window(TumblingProcessingTimeWindows.of(Time.seconds(2L))); SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() { // 创建累加器 初始化累加器 @Override public Integer createAccumulator() { System.out.println("创建累加器"); return 0; } // 计算存储,聚合逻辑 @Override public Integer add(WaterSensor value, Integer accumulator) { System.out.println("调用add方法,value=" + value); return accumulator + value.getVc(); } // 获取最终结果,窗口触发时输出 @Override public String getResult(Integer accumulator) { System.out.println("调用getResult方法"); return accumulator.toString(); } // 只有会话窗口才会用到(一般用前三个) @Override public Integer merge(Integer a, Integer b) { System.out.println("调用merge方法 "); return null; } }); aggregate.print(); env.execute(); } }
sensorWS .process()
public class WindowProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.fromSource(KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092") .setStartingOffsets(OffsetsInitializer.latest()) .setGroupId("yjh") .setTopics("topic_sink") .setValueOnlyDeserializer(new SimpleStringSchema()) .build(), WatermarkStrategy.noWatermarks(), "kafasource"); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }); KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = map.keyBy(WaterSensor::getId); WindowedStream<WaterSensor, String, TimeWindow> window = waterSensorStringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long start = context.window().getStart(); long end = context.window().getEnd(); String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); String data = elements.toString(); out.collect("窗口开始时间:" + windowStart + ",窗口结束时间:" + windowEnd + ",数据个数:" + count + "数据:" + data); } }); process.print(); env.execute(); } }
public class WindowAggregateAndProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.fromSource(KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092") .setTopics("topic_sink") .setGroupId("yjh") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(), WatermarkStrategy.noWatermarks(), "kafkaSource"); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); WaterSensor waterSensor = new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); return waterSensor; }); KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = map.keyBy(WaterSensor::getId); WindowedStream<WaterSensor, String, TimeWindow> window = waterSensorStringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(20))); SingleOutputStreamOperator<String> aggregate = window.aggregate(new MyAgg(), new Mypocess()); aggregate.print(); env.execute(); } private static class MyAgg implements AggregateFunction<WaterSensor,Integer,String> { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { return value.getVc() + accumulator; } @Override public String getResult(Integer accumulator) { return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { return null; } } private static class Mypocess implements WindowFunction<String,String,String,TimeWindow> { @Override public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { long start = window.getStart(); long end = window.getEnd(); long count = input.spliterator().estimateSize(); String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS"); String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS"); out.collect("窗口的大小为["+startWindow+","+endWindow+"),个数为:"+count+"数据为"+input.toString()); } } }
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...) .window(...) .trigger(new MyTrigger())
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...) .window(...) .evictor(new MyEvictor())
事件时间:一个是数据产生的时间(时间戳Timestamp)
处理时间:数据真正被处理的时刻
在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
水位线是基于数据的时间戳生成的
水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
水位线可以通过设置延迟,,来保证正确处理乱序数据
一个水位线Watermmark(t),表示在当前流中事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’<t的数据
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
WatermarkStrategy.forMonotonousTimestamps()
public class WatermarkMonoDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }); SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { System.out.println(element.toString() + "<----->" + recordTimestamp); return element.getTs() * 1000L; } })); KeyedStream<WaterSensor, String> keyBy = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); WindowedStream<WaterSensor, String, TimeWindow> window = keyBy.window(TumblingEventTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { return value.getVc()+accumulator; } @Override public String getResult(Integer accumulator) { return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { return null; } }, new WindowFunction<String, String, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { String start = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss.SSS"); String end = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss.SSS"); long num = input.spliterator().estimateSize(); out.collect("窗口:["+start+","+end+")"+",个数:"+num+",数据:"+input.toString()); } }); aggregate.print(); env.execute(); } }
WatermarkStrategy. forBoundedOutOfOrderness()
public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); WaterSensor waterSensor = new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); return waterSensor; }); SingleOutputStreamOperator<WaterSensor> watermarks = map .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { System.out.println(element.getTs()*100); return element.getTs()*100; } })); KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = watermarks.keyBy(WaterSensor::getId); WindowedStream<WaterSensor, String, TimeWindow> window = waterSensorStringKeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(1))); SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { return value.getVc() + accumulator; } @Override public String getResult(Integer accumulator) { return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { return null; } }, new WindowFunction<String, String, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { String startWindow = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss.SSS"); String envWindow = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss.SSS"); long num = input.spliterator().estimateSize(); out.collect("窗口:[" + startWindow + "," + envWindow + ")" + ",个数:" + num + ",数据:" + input.toString()); }}); aggregate.print(); env.execute(); }; }
public class CustomPeriodicWatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); WaterSensor waterSensor = new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); return waterSensor; }); SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = map.assignTimestampsAndWatermarks(new CustomWatermarkStrategy()); waterSensorSingleOutputStreamOperator.print(); env.execute(); } private static class CustomWatermarkStrategy implements WatermarkStrategy<WaterSensor> { @Override public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<WaterSensor>() { private Long delayTime = 3000L; private Long maxTs = -Long.MAX_VALUE+delayTime+1L; @Override public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) { System.out.println("eventTimestamp:"+eventTimestamp); long maxTs = Math.max(event.getTs(), eventTimestamp); System.out.println(maxTs); } @Override public void onPeriodicEmit(WatermarkOutput output) { System.out.println("水位线时间:" + (maxTs - delayTime - 1L)); output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } }; } @Override public TimestampAssigner<WaterSensor> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } }; } } }
public class WatermarkIdlenessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); DataStream<String> stringDataStream = source.partitionCustom(new Partitioner<String>() { @Override public int partition(String key, int numPartitions) { return Integer.parseInt(key) % numPartitions; } }, new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { System.out.println(value); return value; } }); stringDataStream.map(Integer::parseInt) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))) .keyBy(value -> value % 2) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() { @Override public void process(Integer integer, ProcessWindowFunction<Integer, String, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception { String start = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss.SSS"); String end = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS"); int count = elements.spliterator().characteristics(); out.collect("key=" + integer + "的窗口[" + start + "," + end + ")包含" + count + "条数据===>" + elements.toString()); } }) .print(); env.execute(); } }
1.推迟水位线推进
例:水位线延迟10s推进
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
2.设置窗口延迟关闭
.allowedLateness()
例:窗口延迟3秒关闭
.window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3))
3.使用测输出流来娄底
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateWS)
例:
public class WatermarkLateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777); SingleOutputStreamOperator<WaterSensor> map = source.map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.parseInt(split[2])); }); SingleOutputStreamOperator<WaterSensor> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { System.out.println(element.getTs() ); return element.getTs() * 1000L ; } })); KeyedStream<WaterSensor, String> keyedStream = watermarks.keyBy(WaterSensor::getId); OutputTag<WaterSensor> waterSensorOutputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class)); WindowedStream<WaterSensor, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(waterSensorOutputTag); SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { String start = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss"); String end = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss"); System.out.println(start); System.out.println(end); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + start + "," + end + ")包含" + count + "条数据===>" + elements.toString()); } }); process.print("正常数据"); process.getSideOutput(waterSensorOutputTag).printToErr(); env.execute(); } }
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
public class WindowJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); SingleOutputStreamOperator<Tuple2<String, Integer>> stream1 = env.fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("c", 6), Tuple2.of("b", 3), Tuple2.of("d", 2) ) .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() { @Override public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) { return element.f1 * 1000; } })); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> stream2 = env.fromElements( Tuple3.of("a", 1, 1), Tuple3.of("b", 5, 1), Tuple3.of("c", 6, 6), Tuple3.of("d", 7, 15), Tuple3.of("e", 1, 13), Tuple3.of("b", 9, 3), Tuple3.of("a", 1, 2) ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() { @Override public long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) { return element.f2 * 1000; } })); DataStream<String> join = stream1.join(stream2) .where(value -> value.f0) .equalTo(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { @Override public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception { // System.out.println(first + "<----->" + second); return first + "<----->" + second; } }); join.print(); env.execute(); } }
间隔联结目前只支持事件时间语义。
1.正常使用(无乱序)
public class IntervalJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .fromElements( Tuple3.of("a", 1, 1), Tuple3.of("a", 11, 1), Tuple3.of("b", 2, 1), Tuple3.of("b", 12, 1), Tuple3.of("c", 14, 1), Tuple3.of("d", 15, 1) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); // TODO interval join //1. 分别做keyby,key其实就是关联条件 KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0); //2. 调用 interval join ks1.intervalJoin(ks2) .between(Time.seconds(-2), Time.seconds(2)) .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { /** * 两条流的数据匹配上,才会调用这个方法 * @param left ks1的数据 * @param right ks2的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { // 进入这个方法,是关联上的数据 out.collect(left + "<------>" + right); } }) .print(); env.execute(); } }
2.处理迟到数据
public class IntervalJoinWithLateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); SingleOutputStreamOperator<WaterSensor> stream1 = env.socketTextStream("hadoop102", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } })); SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = env.socketTextStream("hadoop102", 8888) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] split = value.split(","); return Tuple2.of(split[0], Integer.valueOf(split[1])); } }) .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() { @Override public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) { return element.f1; } })); SingleOutputStreamOperator<String> process = stream1.keyBy(WaterSensor::getId) .intervalJoin(stream2.keyBy(value -> value.f0)) .between(Time.milliseconds(-3), Time.milliseconds(3)) .process(new ProcessJoinFunction<WaterSensor, Tuple2<String, Integer>, String>() { @Override public void processElement(WaterSensor left, Tuple2<String, Integer> right, ProcessJoinFunction<WaterSensor, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception { out.collect(left.toString() + "," + right.toString()); } }); process.print(); env.execute(); } }
Flink提供了8个不同的处理函数:
(1)ProcessFunction
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。
案例:
public class KeyedProcessTimerDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); KeyedStream<WaterSensor, String> stream = env.socketTextStream("hadoop102", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs()*1000L; } })) .keyBy(WaterSensor::getId); stream.process(new KeyedProcessFunction<String, WaterSensor, String>() { @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { // 获取当前数据的key String currentKey = ctx.getCurrentKey(); System.out.println("key的信息:"+currentKey); // 1.定时器注册 TimerService timerService = ctx.timerService(); // 注册定时器: 处理时间 // timerService.registerProcessingTimeTimer(); // 删除定时器: 事件时间 // timerService.deleteEventTimeTimer(); // 删除定时器: 处理时间 // timerService.deleteProcessingTimeTimer(); // 获取当前处理时间:系统时间 timerService.currentProcessingTime(); // 获取当前的 watermark timerService.currentWatermark(); // Long timestamp = ctx.timestamp(); //数据中提取出来的事件时间 timerService.registerEventTimeTimer(5000L); System.out.println("当前key=" + currentKey + ",当前时间=" + timestamp + ",注册了一个5s的定时器"); } @Override public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, String>.OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); String currentKey = ctx.getCurrentKey(); System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发"); } }); env.execute(); } }
注意:TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次(key 相同,key不同的话就会执行多次)
需求:开全窗口,设置滑动窗口,10s一个窗口,5s的滑动步长,统计在10秒内vc最多的个数
1.使用全窗口:
public class ProcessAllWindowTopNDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); SingleOutputStreamOperator<WaterSensor> stream1 = env.socketTextStream("hadoop102", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); stream1.print(); AllWindowedStream<WaterSensor, TimeWindow> windowAll = stream1.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))); SingleOutputStreamOperator<String> process = windowAll.process(new MyprocessAll()); process.print(); env.execute(); } public static class MyprocessAll extends ProcessAllWindowFunction<WaterSensor,String,TimeWindow>{ @Override public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { Map<Integer, Integer> map = new HashMap<>(); for (WaterSensor element : elements) { Integer vc = element.getVc(); if(map.containsKey(vc)){ map.put(vc,map.get(vc)+1); }else { map.put(vc,1); } } List<Tuple2<Integer, Integer>> list = new ArrayList<>(); for (Integer integer : map.keySet()) { list.add(Tuple2.of(integer,map.get(integer))); } list.sort(new Comparator<Tuple2<Integer, Integer>>() { @Override public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { return o2.f1 - o1.f1; } }); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < Math.min(2,list.size()); i++) { Tuple2<Integer, Integer> integerIntegerTuple2 = list.get(i); String start = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:ss:mm"); String end = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:ss:mm"); buffer.append("窗口:[" + start + "," + end + ")"); buffer.append("\n"); buffer.append("第"+ (i+1) +"名:vc=" + integerIntegerTuple2.f0 + " 数量:" + integerIntegerTuple2.f1); buffer.append("\n"); } out.collect(buffer.toString()); } } }
2.先keyby在计算
public class KeyedProcessFunctionTopNDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); KeyedStream<WaterSensor, Integer> keyStream = env.socketTextStream("hadoop102", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })) .keyBy(WaterSensor::getVc); keyStream.print(); WindowedStream<WaterSensor, Integer, TimeWindow> windowStream = keyStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))); SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> aggregate = windowStream.aggregate(new AggregateFunction<WaterSensor, Integer, Integer>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { return 1 + accumulator; } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return null; } }, new WindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>() { @Override public void apply(Integer integer, TimeWindow window, Iterable<Integer> input, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception { out.collect(Tuple3.of(integer, input.iterator().next(), window.getStart())); } }); KeyedStream<Tuple3<Integer, Integer, Long>, Long> keyByStream = aggregate.keyBy(value -> value.f2); SingleOutputStreamOperator<String> process = keyByStream.process(new KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>() { List<Tuple3<Integer, Integer, Long>> list = new ArrayList<>(); @Override public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception { String winkey = DateFormatUtils.format(ctx.getCurrentKey(), "yyyy-MM-dd HH:mm:ss"); list.add(value); list.sort(new Comparator<Tuple3<Integer, Integer, Long>>() { @Override public int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) { return o2.f1 - o1.f1; } }); // System.out.println(list.toString()); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < Math.min(2,list.size()); i++) { buffer.append("窗口大小:" + winkey.toString()); buffer.append(",key值" + list.get(i).f0.toString() + ",排名"+(i+1)+":值为:" + list.get(i).f1.toString()); buffer.append("\n"); buffer.append("-----------------------------------------------"); buffer.append("\n"); } out.collect(buffer.toString()); } }); process.print(); env.execute(); } }
需求:对每个传感器,vc超过10的输出告警信息
public class SideOutputDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); OutputTag<String> warn = new OutputTag<>("warn", Types.STRING); SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop102", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })) .keyBy(WaterSensor::getVc) .process(new KeyedProcessFunction<Integer, WaterSensor, WaterSensor>() { @Override public void processElement(WaterSensor value, KeyedProcessFunction<Integer, WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception { if (value.getVc() > 10) { ctx.output(warn, "当前水位=" + value.getVc() + ",大于阈值10!!!"); } out.collect(value); } }); stream.print("正常流:"); stream.getSideOutput(warn).printToErr("warn"); env.execute(); } }
值状态相当与java中的基本类型,存储的就是一个值
T value():获取当前状态的值;
update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。
需求:检测每种传感器的水位值(vc),如果连续的两个水位值超过10,就输出报警。
public class KeyedListStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); // env.fromSource(KafkaSource.builder().build(), WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"kafkasource"); KeyedStream<WaterSensor, String> stream = env.socketTextStream("1.94.41.70", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } })) .keyBy(WaterSensor::getId); stream.process(new KeyedProcessFunction<String, WaterSensor, String>() { ValueState<Integer> lastValue; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); lastValue = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT)); } @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { Integer value1 = lastValue.value() == null ? 0 : lastValue.value(); Integer vc = value.getVc(); if(Math.abs(vc-value1) > 10 ){ out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + value1 + ",相差超过10!!!!"); } lastValue.update(vc); } }) .print(); env.execute(); } }
Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>;
update(List<T> values):传入一个列表values,直接对状态进行覆盖;
add(T value):在状态列表中添加一个元素value;
addAll(List<T> values):向列表中添加多个元素,以列表values形式传入。
需求:针对每种传感器(vc)输出最高的3个水位值
public class KeyedListStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.socketTextStream("1.94.41.70",7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3L)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } })) .keyBy(WaterSensor::getId) .process(new KeyedProcessFunction<String, WaterSensor, String>() { ListState<Integer> vcList ; @Override public void open(Configuration parameters) throws Exception { vcList = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcList", Types.INT)); } @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { vcList.add(value.getVc()); List<Integer> List = new ArrayList<>(); for (Integer vc : vcList.get()) { List.add(vc); } List.sort(new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } }); if(List.size() > 3){ for (int i = 3; i < List.size(); i++) { List.remove(i); } } vcList.update(List); out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + List.toString()); } }) .print(); env.execute(); } }
UV get(UK key):传入一个key作为参数,查询对应的value值;
put(UK key, UV value):传入一个键值对,更新key对应的value值;
putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;
remove(UK key):将指定key对应的键值对删除;
boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。
另外,MapState也提供了获取整个映射相关信息的方法;
Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
boolean isEmpty():判断映射是否为空,返回一个boolean值。
案例:统计每种传感器(vc)每种水位值出现的次数。
public class KeyedMapStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.socketTextStream("1.94.41.70",7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } })) .keyBy(value -> value.id) .process(new KeyedProcessFunction<String, WaterSensor, String>() { MapState<Integer, Integer> mapState; @Override public void open(Configuration parameters) throws Exception { mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("mapState", Types.INT, Types.INT)); } @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { if(mapState.contains(value.getVc())){ mapState.put(value.getVc(),mapState.get(value.getVc())+1) ; }else { mapState.put(value.getVc(),1); } StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("======================================"); stringBuffer.append("\n"); stringBuffer.append("传感器id为" + value.getId() + "\n"); for (Map.Entry<Integer, Integer> entry : mapState.entries()) { stringBuffer.append(entry.toString()); } out.collect(stringBuffer.toString()); } }) .print(); env.execute(); } }
需求:计算每种传感器(vc)的水位和
public class keyedReducingStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.socketTextStream("1.94.41.70",7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } })) .keyBy(value -> value.id) .process(new KeyedProcessFunction<String, WaterSensor, String>() { ReducingState<Integer> reducing; @Override public void open(Configuration parameters) throws Exception { reducing = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reducing", new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }, Types.INT)); } @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { reducing.add(value.getVc()); out.collect("key的值:"+value.id +",vc"+"的总和"+reducing.get().toString()); } } ).print(); env.execute(); } }
需求:计算每种传感器的平均水位
public class KeyedAggregatingStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.socketTextStream("1.94.41.70",7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2])); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3L)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs(); } })) .keyBy(WaterSensor::getId) .process(new KeyedProcessFunction<String, WaterSensor, String>() { AggregatingState<Integer, Double> aggState; @Override public void open(Configuration parameters) throws Exception { aggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("aggState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); } @Override public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return accumulator.f0 * 1D / accumulator.f1; } @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return null; } }, Types.TUPLE(Types.INT, Types.INT))); } @Override public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception { aggState.add(value.getVc()); out.collect("key为:"+value.id+",vc的平均值为:"+aggState.get().toString()); } }) .print(); env.execute(); } }
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。
案例:在map算子中计算数据的个数。
public class OperatorListStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); // env.fromSource(KafkaSource.builder().build(), WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"kafkasource"); DataStreamSource<String> source = env.socketTextStream("1.94.41.70", 7777); SingleOutputStreamOperator<Long> map = source.map(new MyCountMapFunction()); map.print(); env.execute(); } private static class MyCountMapFunction implements MapFunction<String,Long> , CheckpointedFunction { Long count = 0L; ListState<Long> state; @Override public Long map(String value) throws Exception { return count++; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("snapshotState...."); state.clear(); state.add(count); } /** * 初始化方法 * @param context * @throws Exception */ @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("initializeState......"); state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("name", Types.LONG)); if (context.isRestored()) { for (Long c : state.get()) { count += c; } } } } }
算子状态中, list 与 unionlist的区别:并行度改变时,怎么重新分配状态
1、List状态:轮询均分 给 新的 并行子任务
2、unionlist状态: 原先的多个子任务的状态,合并成一份完整的。 给新的并行子任务 ,每人一份完整的
需求:水位(DataDS流)超过指定的阈值发送告警,阈值可以动态修改(Config流)。
public class OperatorBroadcastStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); SingleOutputStreamOperator<WaterSensor> DataDS = env.socketTextStream("1.94.41.70", 7777) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])); }); DataStreamSource<String> ConfigDS = env.socketTextStream("1.94.41.70", 8888); MapStateDescriptor<String, Integer> broad = new MapStateDescriptor<>("broad", Types.STRING, Types.INT); BroadcastStream<String> broadcast = ConfigDS.broadcast(broad); BroadcastConnectedStream<WaterSensor, String> connect = DataDS.connect(broadcast); connect.process(new BroadcastProcessFunction<WaterSensor, String, String>() { @Override public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception { ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broad); Integer broad1 = broadcastState.get("broad"); Integer vc = value.getVc(); int broad2 = broad1 == null ? 0 : broad1; if (vc > broad2){ out.collect(value + ",水位超过指定的阈值:" + broad2 + "!!!"); } } @Override public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception { BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broad); broadcastState.put("broad",Integer.valueOf(value)); } }) .print(); env.execute(); } }
1.负责管理 本地状态 2.hashmap
存在 TM的 JVM的堆内存,读写快,缺点是存不了太多(受限与TaskManager的内存)
rocksdb
存在 TM所在节点的rocksdb数据库,存到磁盘中,写--序列化,读--反序列化读写相对慢一些,可以存很大的状态 3.配置方式 1)配置文件 默认值 ink-conf.yaml
2)代码中指定
hashmap
env.setStateBackend(new HashMapStateBackend());
rocksdb
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> </dependency>
env.setStateBackend(new EmbeddedRocksDBStateBackend());
3)提交参数指定
flink run-application -t yarn-application -p3 -Dstate.backend.type=rocksdb -c 全类名- jar包
在流处理中,我们可以用存档读档的思路,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。
精准一次
至少一次
总结:
1.Barrier对齐: 一个Task 收到|所有上游 同一个编号的 barrier之后,才会对自己的本地状态做 备份
精准一次: 在barrier对齐过程中,barrier后面的数据 阻塞等待(不会越过barrier)
至少一次: 在barrier对齐过程中,先到的barrier,其后面的数据 不阻塞 接者计算 2.非Barrier对齐:一个Task 收到 第一个 barrier时,就开始 执行备份,能保证 准一次(fLink 1.11出的新法)
先到的barrier,将 本地状态 备份, 其后面的数据接者计算输出
未到的barrier,其前面的数据 接着计算输出,同时 也保存到备份中
最后一个barrier到达 该Task时,这个Task的备份结束
案例:
borrier对齐
// 1.启动检查点:默认是borrier对齐的,周期为5s,精确一次 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 2.指定检查点的存储位置 checkpointConfig.setCheckpointStorage("htfs://hadoop102:8020/chk"); // 3.checkpoint的超时时间:默认10分钟 checkpointConfig.setCheckpointTimeout(60000); // 4.同时运行中的checkpoint的最大数量,默认是1,一般都是1 checkpointConfig.setMaxConcurrentCheckpoints(1); // 5.最小等待间隔:上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置>0,并发就会变成1 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 6.取消作业时,checkpoint的数据,是否保留在外部系统 // DELETE_ON_CANCELLATION:动cancel时,删除存在外部系统的chk-xx目录(如果是程序笑然挂掉,不会删) checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); // 7.允许checkpint 连续失败的次数,默认 0---> 表示checkpoint一失败,job就挂掉 checkpointConfig.setTolerableCheckpointFailureNumber(10);
因为存储到hadoop中,所以添加hadoop依赖,但是打包不要打在包中
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.4</version> <scope>provided</scope> </dependency>
borrier非对齐:
// 1.启动检查点:默认是borrier对齐的,周期为5s,精确一次 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 2.指定检查点的存储位置 checkpointConfig.setCheckpointStorage("htfs://hadoop102:8020/chk"); // 3.checkpoint的超时时间:默认10分钟 checkpointConfig.setCheckpointTimeout(60000); // 4.同时运行中的checkpoint的最大数量,默认是1,一般都是1 checkpointConfig.setMaxConcurrentCheckpoints(1); // 5.最小等待间隔:上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置>0,并发就会变成1 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 6.取消作业时,checkpoint的数据,是否保留在外部系统 // DELETE_ON_CANCELLATION:动cancel时,删除存在外部系统的chk-xx目录(如果是程序笑然挂掉,不会删) checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 7.允许checkpint 连续失败的次数,默认 0---> 表示checkpoint一失败,job就挂掉 checkpointConfig.setTolerableCheckpointFailureNumber(10); // TODO 开启 非对齐检查点(barrier非对齐) // 开启的要求:checkpoint模式必须是精确一次,最大并发必须设为1 checkpointConfig.enableUnalignedCheckpoints(); // 开启非对齐检查点才生效: 默认0, 表示一开始就直接用 非对齐的检查点 // 如果大于0,一开始用 对齐的检查点(barrier对齐),对齐的时间超过这个参数,自动切换成 非对齐检查点 (barrier非对齐) checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));
从 1.15 开始,不管hashmap还是rocksdb 状态后端都可以通过开启changelog实现通用的增量checkpoint。
注意事项
(1)目前标记为实验性功能,开启后可能会造成资源消耗增大:
HDFS上保存的文件数变多
消耗更多的IO带宽用于上传变更日志
更多的CPU用于序列化状态更改
TaskManager使用更多内存来缓存状态更改
(2)使用限制:
Checkpoint的最大并发必须为1
从 Flink 1.15 开始,只有文件系统的存储类型实现可用(memory测试阶段)
不支持 NO_CLAIM 模式
使用方式
(1)方式一:配置文件指定
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem \# 存储 changelog 数据 dstl.dfs.base-path: hdfs://hadoop102:8020/changelog execution.checkpointing.max-concurrent-checkpoints: 1 execution.savepoint-restore-mode: CLAIM
(2)方式二:在代码中设置
需要引入依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-changelog</artifactId> <version>${flink.version}</version> <scope>runtime</scope> </dependency>
开启changelog:
env.enableChangelogStateBackend(true);
Configuration config = new Configuration(); config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
保存点与检查点最大的区别,就是触发的时机。检查点是由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。
一句话理解:检查点是flink自己定时保存,保存点是我们自己使用命令保存
建议:(生产环境也要写的)
对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。
DataStream<String> stream = env .addSource(new StatefulSource()).uid("source-id") .map(new StatefulMapper()).uid("mapper-id") .print();
保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。
(1)创建保存点
要在命令行中为运行的作业创建一个保存点镜像,只需要执行:
bin/flink savepoint :jobId [:targetDirectory]
这里jobId需要填充要做镜像保存的作业ID,目标路径targetDirectory可选,表示保存点存储的路径。
对于保存点的默认路径,可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定:
state.savepoints.dir: hdfs:///flink/savepoints
当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:
bin/flink stop --savepointPath [:targetDirectory] :jobId
(2)从保存点重启应用
我们已经知道,提交启动一个Flink作业,使用的命令是flink run;现在要从保存点重启一个应用,其实本质是一样的:
bin/flink run -s :savepointPath [:runArgs]
这里只要增加一个-s参数,指定保存点的路径就可以了,其它启动时的参数还是完全一样的,如果是基于yarn的运行模式还需要加上 -yid application-id。我们在第三章使用web UI进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。
使用savepoint恢复状态的时候,也可以更换状态后端。但是有一点需要注意的是,不要在代码中指定状态后端了, 通过配置文件来配置或者-D 参数配置。
打包时,服务器上有的就provided,可能遇到依赖问题,报错:javax.annotation.Nullable找不到,可以导入如下依赖:
<dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>1.3.9</version> </dependency>
(1)提交flink作业
bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar
(2)停止flink作业时,触发保存点
方式一:stop优雅停止并触发保存点,要求source实现StoppableFunction接口
bin/flink stop -p savepoint 路径 job-id -yid application-id
方式二:cancel立即停止并触发保存点
bin/flink cancel -s savepoint路径 job-id -yid application-id
案例中source是socket,不能用stop
bin/flink cancel -s hdfs://hadoop102:8020/sp cffca338509ea04f38f03b4b77c8075c -yid application_1681871196375_0001
(3)从savepoint恢复作业,同时修改状态后端
bin/flink run-application -d -t yarn-application -s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 -Dstate.backend=rocksdb -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar
(4)从保存下来的checkpoint恢复作业
bin/flink run-application -d -t yarn-application -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.atguigu.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar
如果停止作业时,忘了触发保存点也不用担心,现在版本的flink支持从保留在外部系统的checkpoint恢复作业,但是恢复时不支持切换状态后端。
注意:flinksql与hivesql很多类似,这里就写我不熟悉和hivesql不一致的地方
1.启动flink
前提:先启动hadoop集群
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
2.启动Flink的sqlclient
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session
1)结果显示模式
#默认table,还可以设置为tableau、changelog SET sql-client.execution.result-mode=tableau;
3)执行环境
SET execution.runtime-mode=streaming; #默认streaming,也可以设置batch
4)默认并行度
SET parallelism.default=1;
5)设置状态TTL
SET table.exec.state.ttl=1000;
6)通过sql文件初始化
(1)创建sql文件
vim conf/sql-client-init.sql SET sql-client.execution.result-mode=tableau; CREATE DATABASE mydatabase;
(2)启动时,指定sql文件
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( ... );
ts AS PROCTIME()
CREATE TABLE EventTable( user STRING, url STRING, ts AS PROCTIME() ) WITH ( ... );
1)创建数据库
(1)语法
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name [COMMENT database_comment] WITH (key1=val1, key2=val2, ...)
(2)案例
CREATE DATABASE db_flink;
2查询数据库
(1)查询所有数据库
SHOW DATABASES
(2)查询当前数据库
SHOW CURRENT DATABASE
3)修改数据库
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
4)删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
RESTRICT:删除非空数据库会触发异常。默认启用
CASCADE:删除非空数据库也会删除所有相关的表和函数。
DROP DATABASE db_flink2;
5)切换当前数据库
USE database_name;
1)创建表
(1)语法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] [ <table_constraint> ][ , ...n] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] | AS select_query ]
① physical_column_definition
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
② metadata_column_definition
元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。例如,我们可以使用元数据列从Kafka记录中读取和写入时间戳,用于基于时间的操作(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记)。connector和format文档列出了每个组件可用的元数据字段。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka' ... );
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样, FROM xxx 子句可省略
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP_LTZ(3) METADATA ) WITH ( 'connector' = 'kafka' ... );
如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致,程序运行时会自动 cast强转,但是这要求两种数据类型是可以强转的。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 将时间戳强转为 BIGINT `timestamp` BIGINT METADATA ) WITH ( 'connector' = 'kafka' ... );
默认情况下,Flink SQL planner 认为 metadata 列可以读取和写入。然而,在许多情况下,外部系统提供的只读元数据字段比可写字段多。因此,可以使用VIRTUAL关键字排除元数据列的持久化(表示只读)。
CREATE TABLE MyTable ( `timestamp` BIGINT METADATA, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `name` STRING, ) WITH ( 'connector' = 'kafka' ... );
③ computed_column_definition
计算列是使用语法column_name AS computed_column_expression生成的虚拟列。
计算列就是拿已有的一些列经过一些自定义的运算生成的新列,在物理上并不存储在表中,只能读不能写。列的数据类型从给定的表达式自动派生,无需手动声明。
CREATE TABLE MyTable ( `user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` AS price * quanitity ) WITH ( 'connector' = 'kafka' ... );
④ 定义Watermark
Flink SQL 提供了几种 WATERMARK 生产策略:
Ø 严格升序:WATERMARK FOR rowtime_column AS rowtime_column。
Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。
Ø 递增:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 。
一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。
Ø 有界无序: WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL 'string' timeUnit 。
此类策略就可以用于设置最大乱序时间,假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND ,则生成的是运行 5s 延迟的Watermark。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。
⑤ PRIMARY KEY
主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行,只支持 not enforced。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, PARYMARY KEY(user_id) not enforced ) WITH ( 'connector' = 'kafka' ... );
⑥ PARTITIONED BY
创建分区表
⑦ with语句
用于创建表的表属性,用于指定外部存储系统的元数据信息。配置属性时,表达式key1=val1的键和值都应该是字符串字面值。如下是Kafka的映射表:
CREATE TABLE KafkaTable ( `user_id` BIGINT, `name` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' )
一般 with 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的with 配置项都是不同的。
⑧ LIKE
用于基于现有表的定义创建表。此外,用户可以扩展原始表或排除表的某些部分。
可以使用该子句重用(可能还会覆盖)某些连接器属性,或者向外部定义的表添加水印。
CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE Orders_with_watermark ( -- Add watermark definition WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( -- Overwrite the startup-mode 'scan.startup.mode' = 'latest-offset' ) LIKE Orders;
⑨ AS select_statement(CTAS)
在一个create-table-as-select (CTAS)语句中,还可以通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。
CREATE TABLE my_ctas_table WITH ( 'connector' = 'kafka', ... ) AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
注意:CTAS有以下限制:
Ø 暂不支持创建临时表。
Ø 目前还不支持指定显式列。
Ø 还不支持指定显式水印。
Ø 目前还不支持创建分区表。
Ø 目前还不支持指定主键约束。
(2)简单建表示例
CREATE TABLE test( id INT, ts BIGINT, vc INT ) WITH ( 'connector' = 'print' ); CREATE TABLE test1 ( `value` STRING ) LIKE test;
2)查看表
(1)查看所有表
SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
如果没有指定数据库,则从当前数据库返回表。
LIKE子句中sql pattern的语法与MySQL方言的语法相同:
Ø %匹配任意数量的字符,甚至零字符,\%匹配一个'%'字符。
Ø 只匹配一个字符,_只匹配一个''字符
(2)查看表信息
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
3)修改表
(1)修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
(2)修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
4)删除表
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。(1.13后都用TVF,没有用这个了)
分组窗口函数 | 描述 |
---|---|
TUMBLE(time_attr, interval) | 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
HOP(time_attr, interval, interval) | 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 |
SESSION(time_attr, interval) | 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 |
案例:
1) 准备数据
CREATE TABLE ws ( id INT, vc INT, pt AS PROCTIME(), --处理时间 et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间 WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.id.min' = '1', 'fields.id.max' = '3', 'fields.vc.min' = '1', 'fields.vc.max' = '100' );
2)滚动窗口示例(时间属性字段,窗口长度)
select id, TUMBLE_START(et, INTERVAL '5' SECOND) wstart, TUMBLE_END(et, INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, TUMBLE(et, INTERVAL '5' SECOND);
3)滑动窗口(时间属性字段,滑动步长,窗口长度)
select id, HOP_START(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wstart, HOP_END(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, HOP(et, INTERVAL '3' SECOND,INTERVAL '5' SECOND);
4)会话窗口(时间属性字段,会话间隔)
select id, SESSION_START(et, INTERVAL '5' SECOND) wstart, SESSION_END(et, INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, SESSION(et, INTERVAL '5' SECOND);
对比GroupWindow,TVF窗口更有效和强大。包括:
1.提供更多的性能优化手段
2.支持GroupingSets语法
3.可以在window聚合中使用TopN
4.提供累积窗口
用法:
FROM TABLE( 窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…) ) GROUP BY [window_start,][window_end,] --可选
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, id;
要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS)) GROUP BY window_start, window_end, id;
举例:以6秒钟为一个周期,没2秒加一次,如0秒到2秒一次结果,0秒到4秒一次结果,0秒到6秒一次结果,6秒到8秒一次结果,6秒到10一次结果。。。。。。
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS)) GROUP BY window_start, window_end, id;
grouping sets多维分析
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, rollup( (id) ) -- cube( (id) ) -- grouping sets( (id),() ) ;
SELECT agg_func(agg_col) OVER ( [PARTITION BY col1[, col2, ...]] ORDER BY time_col range_definition), ... FROM ...
ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序
PARTITION BY:标识了聚合窗口的聚合粒度
range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合,第二种为按照时间区间聚合
1.按照时间区间聚合
统计每个传感器前10秒到现在收到的水位数据条数。
SELECT id, et, vc, count(vc) OVER ( PARTITION BY id ORDER BY et RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW ) AS cnt FROM ws
或者
SELECT id, et, vc, count(vc) OVER w AS cnt, sum(vc) OVER w AS sumVC FROM ws WINDOW w AS ( PARTITION BY id ORDER BY et RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW )
2.按照行数聚合
统计每个传感器前5条到现在数据的平均水位
SELECT id, et, vc, avg(vc) OVER ( PARTITION BY id ORDER BY et ROWS BETWEEN 5 PRECEDING AND CURRENT ROW ) AS avgVC FROM ws
或者
SELECT id, et, vc, avg(vc) OVER w AS avgVC, count(vc) OVER w AS cnt FROM ws WINDOW w AS ( PARTITION BY id ORDER BY et ROWS BETWEEN 5 PRECEDING AND CURRENT ROW )
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
ROW_NUMBER() :标识 TopN 排序子句
PARTITION BY col1[, col2...] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的 partition by key ,就是根据需求中的搜索关键词(key)做为分区
ORDER BY col1 asc|desc...] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序,可以不是时间字段,也可以降序(TopN特殊支持)
WHERE rownum <= N :这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个TopN 的查询,其中 N 代表 TopN 的条目数
[AND conditions] :其他的限制条件也可以加上
取每个传感器最高的3个水位值
select id, et, vc, rownum from ( select id, et, vc, row_number() over( partition by id order by vc desc ) as rownum from ws ) where rownum<=3;
去重,也即上文介绍到的TopN 中 row_number = 1 的场景,但是这里有一点不一样在于其排序字段一定是时间属性列,可以降序,不能是其他非时间属性的普通列。
如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
select id, et, vc, rownum from ( select id, et, vc, row_number() over( partition by id,vc order by et ) as rownum from ws ) where rownum=1;
Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]
Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]
Regular Join 的注意事项:
实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联
流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。
案例:
CREATE TABLE ws1 ( id INT, vc INT, pt AS PROCTIME(), --处理时间 et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间 WATERMARK FOR et AS et - INTERVAL '0.001' SECOND --watermark ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.id.min' = '3', 'fields.id.max' = '5', 'fields.vc.min' = '1', 'fields.vc.max' = '100' );
内连接:
SELECT * FROM ws INNER JOIN ws1 ON ws.id = ws1.id
外连接:
SELECT * FROM ws LEFT JOIN ws1 ON ws.id = ws1.id SELECT * FROM ws RIGHT JOIN ws1 ON ws.id = ws1.id SELECT * FROM ws FULL OUTER JOIN ws1 ON ws.id = ws.id
1.两表的联结
间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。
2.联结条件
联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。
3.时间间隔限制
我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用ltime和rtime表示左右表中的时间字段:
(1)ltime = rtime
(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
案例:
SELECT * FROM ws,ws1 WHERE ws.id = ws1. id AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND
Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。
上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段。
语法:
表A JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名 ON xx.字段=别名.字段
案例:
CREATE TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://hadoop102:3306/customerdb', 'table-name' = 'customers' ); -- order表每来一条数据,都会去mysql的customers表查找维度数据 SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
Order by
实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。
案例:
SELECT * FROM ws ORDER BY et, id desc
limit
SELECT * FROM ws LIMIT 3
在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;
1.UNION和 UNION ALL
UNION:将集合合并并且去重
UNION ALL:将集合合并,不做去重。
(SELECT id FROM ws) UNION (SELECT id FROM ws1); (SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);
2.Intersect和IntersectAll
Intersect:交集并且去重
Intersect ALL:交集不做去重
(SELECT id FROM ws) INTERSECT (SELECT id FROM ws1); (SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);
3.Except和Except All
Except:差集并且去重
Except ALL:差集不做去重
(SELECT id FROM ws) EXCEPT (SELECT id FROM ws1); (SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);
4.In 子查询
In 子查询的结果集只能有一列
SELECT id, vc FROM ws WHERE id IN ( SELECT id FROM ws1 )
上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。
System (Built-in) Functions | Apache Flink
目前 Flink 包含了以下三种 Module:
1.CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
2.HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
3.用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module
将hive的函数flink也能用
1.上传jar包到flink的lib中
2.拷贝hadoop包,解决依赖冲突
cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/
3.分发lib包到其它集群
xsync lib/
4.启动flink和启动客户端
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session
5.更改显示方式
#默认table,还可以设置为tableau、changelog SET sql-client.execution.result-mode=tableau;
6.加载hive module
load module hive with ('hive-version'='3.1.3');
7.验证是否成功
show modules; show functions;
8.案例
注意:flink中是没有split函数的,只有hive中才有
-- 可以调用hive的split函数 select split('a,b', ',');
官网地址:Kafka | Apache Flink
WITH()后面的不要手敲,去官网复制
1.准备工作
(1)将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下
(2)重启yarn-session、sql-client
2.普通Kafka表
创建kafka映射表
CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读 `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, id int, ts bigint , vc int ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'hadoop103:9092', 'properties.group.id' = 'ws1', -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets' 'scan.startup.mode' = 'earliest-offset', -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区 'sink.partitioner' = 'fixed', 'topic' = 'ws1', 'format' = 'json' )
插入kafka表
insert into t1(id,ts,vc) select * from source;
查询kafka表
select * from t1;
3.upsert-kafka表
如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
1.创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2( id int , sumVC int , primary key (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'hadoop102:9092', 'topic' = 'ws2', 'key.format' = 'json', 'value.format' = 'json' )
2.插入upsert-kafka 表
insert into t2 select id, sum(vc) as sumVC from source group by id
3.查询upset-kafka表
select * from t2;
1.创建FileSystem映射表
CREATE TABLE t3( id int, ts bigint , vc int ) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop102:8020/data/t3', 'format' = 'csv' )
2.插入数据
insert into t3 select * from source
3.查询
select * from t3 where id = '1'
4.报错问题
如上报错是因为之前lib下放了sql-hive的连接器jar包,解决方案有两种:
1.将hive的连接器jar包挪走,重启yarn-session、sql-client
mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.bak
2.同10.8.3中的操作,替换planner的jar包
Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。
1.准备工作
上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:
flink-connector-jdbc-1.17-20230109.003314-120.jar
mysql-connector-j-8.0.31.jar
2.在mysql中创建表
CREATE TABLE `ws2` ( `id` int(11) NOT NULL, `ts` bigint(20) DEFAULT NULL, `vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
3.创建JDBC映射表
CREATE TABLE t4 ( id INT, ts BIGINT, vc INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector'='jdbc', 'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8', 'username' = 'root', 'password' = '000000', 'connection.max-retry-timeout' = '60s', 'table-name' = 'ws2', 'sink.buffer-flush.max-rows' = '500', 'sink.buffer-flush.interval' = '5s', 'sink.max-retries' = '3', 'sink.parallelism' = '1' );
4.查询
select * from t4
5.写入
insert into t4 select * from source
1)提交一个insert作业,可以给作业设置名称
INSERT INTO sink select * from source;
2)查看job列表
SHOW JOBS;
3)停止作业,触发savepoint
SET state.checkpoints.dir='hdfs://hadoop102:8020/chk'; SET state.savepoints.dir='hdfs://hadoop102:8020/sp'; STOP JOB '228d70913eab60dda85c5e7f78b5782c' WITH SAVEPOINT;
4)从savepoint恢复
-- 设置从savepoint恢复的路径 SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-37f5e6-0013a2874f0a'; -- 之后直接提交sql,就会从savepoint恢复 --允许跳过无法还原的保存点状态 set 'execution.savepoint.ignore-unclaimed-state' = 'true';
5)恢复后重置路径
指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。
RESET execution.savepoint.path;
如果出现reset没生效的问题,可能是个bug,我们可以退出sql-client,再重新进,不需要重启flink的集群。
catalog的作用:就是每次在启动flink会话的时候,创建表以后,关闭flink会话,重启然后表就不见了,如果用hivecatalog,就能解决这个问题,重启flink后以前创建的表还在。
1.GenericInMemoryCatalog
基于内存的,默认的就是使用的是这个
2.JdbcCatalog
目前就支持Postgres Catalog和MySQL Catalog
3.HiveCatalog
重点
4.用户自定义Catalog
JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。
1)上传所需jar包到lib下
1.17的JDBC连接器还未发布到中央仓库,可以从apache snapshot仓库下载:
Index of /repositories/snapshots/org/apache/flink/flink-connector-jdbc/1.17-SNAPSHOT
cp flink-connector-jdbc-1.17-20230109.003314-120.jar /opt/module/flink-1.17.0/lib/ cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/
2)重启flink集群和sql-client
3)创建Catalog
JdbcCatalog支持以下选项:
name:必需,Catalog名称。
default-database:必需,连接到的默认数据库。
username: 必需,Postgres/MySQL帐户的用户名。
password:必需,该帐号的密码。
base-url:必需,数据库的jdbc url(不包含数据库名)
对于Postgres Catalog,是"jdbc:postgresql://<ip>:<端口>"
对于MySQL Catalog,是"jdbc: mysql://<ip>:<端口>"
CREATE CATALOG my_jdbc_catalog WITH( 'type' = 'jdbc', 'default-database' = 'test', 'username' = 'root', 'password' = '000000', 'base-url' = 'jdbc:mysql://hadoop102:3306' );
4)查看Catalog
SHOW CATALOGS; --查看当前的CATALOG SHOW CURRENT CATALOG;
5)使用指定Catalog
USE CATALOG my_jdbc_catalog; --查看当前的CATALOG SHOW CURRENT CATALOG;
1)上传所需jar包到lib下
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/ cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.0/lib/
2)更换planner依赖
只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。
mv /opt/module/flink-1.17.0/opt/flink-table-planner_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/flink-table-planner_2.12-1.17.0.jar mv /opt/module/flink-1.17.0/lib/flink-table-planner-loader-1.17.0.jar /opt/module/flink-1.17.0/opt/flink-table-planner-loader-1.17.0.jar
3)重启flink集群和sqlclient
4)启动外置的hivemetastore服务
Hive metastore必须作为独立服务运行,也就是hive-site中必须配置hive.metastore.uris
hive --service metastore &
5)创建Catalog
配置项 | 必需 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
type | Yes | (none) | String | Catalog类型,创建HiveCatalog时必须设置为'hive'。 |
name | Yes | (none) | String | Catalog的唯一名称 |
hive-conf-dir | No | (none) | String | 包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。 |
default-database | No | default | String | Hive Catalog使用的默认数据库 |
hive-version | No | (none) | String | HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。 |
hadoop-conf-dir | No | (none) | String | Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。 |
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/opt/module/hive/conf' );
6)查看Catalog
SHOW CATALOGS; --查看当前的CATALOG SHOW CURRENT CATALOG;
7)使用指定Catalog
USE CATALOG myhive; --查看当前的CATALOG SHOW CURRENT CATALOG;
建表,退出sql-client重进,查看catalog和表还在。
8)读写Hive表
SHOW DATABASES; -- 可以看到hive的数据库 USE test; -- 可以切换到hive的数据库 SHOW TABLES; -- 可以看到hive的表 SELECT * from ws; --可以读取hive表 INSERT INTO ws VALUES(1,1,1); -- 可以写入hive表
代码有两种方式:一种是写sql,别一种是API(这种我都不用)
1.pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>
2.创建表环境
方式一:
EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() // 使用流处理模式 .build(); TableEnvironment tableEnv = TableEnvironment.create(setting);
方式二:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
3.创建表
使用:executeSql
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
4.表的查询
使用:sqlQuery
Table aliceVisitTable = tableEnv.sqlQuery( "SELECT user, url " + "FROM EventTable " + "WHERE user = 'Alice' " );
插入数据使用:executeSql
// 将查询结果输出到OutputTable中 tableEnv.executeSql ( "INSERT INTO OutputTable " + "SELECT user, url " + "FROM EventTable " + "WHERE user = 'Alice' " );
5.输出表
输出相当与向外部表插入,使用 executeSql
tableEnv.executeSql("insert into sink select * from tmp");
6.流转表
使用:fromDataStream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 读取数据源 SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...) // 将数据流转换成表 Table sensorTable = tableEnv.fromDataStream(sensorDS);
// 将timestamp字段重命名为ts Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));
7.创建虚拟视图
使用:createTemporaryView
// 将数据流转换成表 Table sensorTable = tableEnv.fromDataStream(sensorDS); tableEnv.createTemporaryView("tmp", sensorTable); tableEnv.sqlQuery("select * from tmp where id > 7");
8.表转流
1.追加流
tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
2.回撤流
tableEnv.toChangelogStream(sumTable ).print("sum");
例:
public class TableStreamDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 2L, 2), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3), new WaterSensor("s3", 4L, 4) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // TODO 1. 流转表 Table sensorTable = tableEnv.fromDataStream(sensorDS); tableEnv.createTemporaryView("sensor", sensorTable); Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2"); Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id"); // TODO 2. 表转流 // 2.1 追加流 tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter"); // 2.2 changelog流(结果需要更新) tableEnv.toChangelogStream(sumTable ).print("sum"); // 只要代码中调用了 DataStreamAPI,就需要 execute,否则不需要 env.execute(); } }
9.自定义函数(UDF)
标量函数(Scalar Functions):一对一
表函数(Table Functions):一对多
聚合函数(Aggregate Functions):多对一
表聚合函数(Table Aggregate Functions):多对多
1.标量函数(ScalarFunctions)
自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法,标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。
public class MyScalarFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 2L, 2), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3), new WaterSensor("s3", 4L, 4) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table sensorTable = tableEnv.fromDataStream(sensorDS); tableEnv.createTemporaryView("sensor", sensorTable); // TODO 2.注册函数 tableEnv.createTemporaryFunction("HashFunction", HashFunction.class); // TODO 3.调用 自定义函数 // 3.1 sql用法 // tableEnv.sqlQuery("select HashFunction(id) from sensor") // .execute() // 调用了 sql的execute,就不需要 env.execute() // .print(); // 3.2 table api用法 sensorTable .select(call("HashFunction",$("id"))) .execute() .print() } // TODO 1.定义 自定义函数的实现类 public static class HashFunction extends ScalarFunction{ // 接受任意类型的输入,返回 INT型输出 public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { return o.hashCode(); } } }
2.表函数(TableFunctions)
要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。
public class MyTableFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> strDS = env.fromElements( "hello flink", "hello world hi", "hello java" ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table sensorTable = tableEnv.fromDataStream(strDS, $("words")); tableEnv.createTemporaryView("str", sensorTable); // TODO 2.注册函数 tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class); // TODO 3.调用 自定义函数 // 3.1 交叉联结 tableEnv // 3.1 交叉联结 // .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))") // 3.2 带 on true 条件的 左联结 // .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true") // 重命名侧向表中的字段 .sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words)) as T(newWord,newLength) on true") .execute() .print(); } // TODO 1.继承 TableFunction<返回的类型> // 类型标注: Row包含两个字段:word和length @FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>")) public static class SplitFunction extends TableFunction<Row> { // 返回是 void,用 collect方法输出 public void eval(String str) { for (String word : str.split(" ")) { collect(Row.of(word, word.length())); } } } }
3.聚合函数(AggregateFunctions)
自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数<T, ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。
Flink SQL中的聚合函数的工作原理如下:
(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。
(2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。
(3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。
所以,每个 AggregateFunction 都必须实现以下几个方法:
w createAccumulator()
这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。
w accumulate()
这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。
w getValue()
这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。
在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。
AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。
public class MyAggregateFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 姓名,分数,权重 DataStreamSource<Tuple3<String,Integer, Integer>> scoreWeightDS = env.fromElements( Tuple3.of("zs",80, 3), Tuple3.of("zs",90, 4), Tuple3.of("zs",95, 4), Tuple3.of("ls",75, 4), Tuple3.of("ls",65, 4), Tuple3.of("ls",85, 4) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight")); tableEnv.createTemporaryView("scores", scoreWeightTable); // TODO 2.注册函数 tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class); // TODO 3.调用 自定义函数 tableEnv .sqlQuery("select name,WeightedAvg(score,weight) from scores group by name") .execute() .print(); } // TODO 1.继承 AggregateFunction< 返回类型,累加器类型<加权总和,权重总和> > public static class WeightedAvg extends AggregateFunction<Double, Tuple2<Integer, Integer>> { @Override public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) { return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1; } @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); } /** * 累加计算的方法,每来一行数据都会调用一次 * @param acc 累加器类型 * @param score 第一个参数:分数 * @param weight 第二个参数:权重 */ public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){ acc.f0 += score * weight; // 加权总和 = 分数1 * 权重1 + 分数2 * 权重2 +.... acc.f1 += weight; // 权重和 = 权重1 + 权重2 +.... } } }
4.表聚合函数(TableAggregate Functions)
自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数<T, ACC>,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:
createAccumulator()
创建累加器的方法,与AggregateFunction中用法相同。
accumulate()
聚合计算的核心方法,与AggregateFunction中用法相同。
emitValue()
所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect<T>。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。
public class MyTableAggregateFunctionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 姓名,分数,权重 DataStreamSource<Integer> numDS = env.fromElements(3, 6, 12, 5, 8, 9, 4); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table numTable = tableEnv.fromDataStream(numDS, $("num")); // TODO 2.注册函数 tableEnv.createTemporaryFunction("Top2", Top2.class); // TODO 3.调用 自定义函数: 只能用 Table API numTable .flatAggregate(call("Top2", $("num")).as("value", "rank")) .select( $("value"), $("rank")) .execute().print(); } // TODO 1.继承 TableAggregateFunction< 返回类型,累加器类型<加权总和,权重总和> > // 返回类型 (数值,排名) =》 (12,1) (9,2) // 累加器类型 (第一大的数,第二大的数) ===》 (12,9) public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); } /** * 每来一个数据调用一次,比较大小,更新 最大的前两个数到 acc中 * * @param acc 累加器 * @param num 过来的数据 */ public void accumulate(Tuple2<Integer, Integer> acc, Integer num) { if (num > acc.f0) { // 新来的变第一,原来的第一变第二 acc.f1 = acc.f0; acc.f0 = num; } else if (num > acc.f1) { // 新来的变第二,原来的第二不要了 acc.f1 = num; } } /** * 输出结果: (数值,排名)两条最大的 * * @param acc 累加器 * @param out 采集器<返回类型> */ public void emitValue(Tuple2<Integer, Integer> acc, Collector<Tuple2<Integer, Integer>> out) { if (acc.f0 != 0) { out.collect(Tuple2.of(acc.f0, 1)); } if (acc.f1 != 0) { out.collect(Tuple2.of(acc.f1, 2)); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。