赞
踩
本文使用 Flink CDC 最新版本 2.2 及 Flink 1.14 版本通过 Java DataStream API 做 双流 Join 案例。
处理事件窗口联结
处理时间窗口 内联结 案例
处理时间窗口 外联结 案例
事件时间窗口联结
事件时间窗口 内联结 案例
事件时间窗口 外联结 案例
案例大致流程如下:
软件版本说明如下:
注意事项:
MySQL 服务器必须开启 binlog 功能
Flink CDC 官网:https://github.com/ververica/flink-cdc-connectors
Flink 官网:https://flink.apache.org/
教师表(teacher)字段说明:
- [
- t_id varchar(3) primary key COMMENT '主键',
- t_name varchar(10) not null COMMENT '主键',
- ]
课程表(course)字段说明:
- [
- c_id varchar(3) primary key COMMENT '主键',
- c_name varchar(20) not null COMMENT '主键',
- c_tid varchar(3) not null COMMENT '主键'
- ]
教师表(teacher)数据流字段说明:
- [
- t_id: string, // 主键id
- t_name: string, // 教师名称
- op: string, // 数据操作类型
- ts_ms: long // 毫秒时间戳
- ]
课程表(course)数据流字段说明:
- [
- c_id: string, // 主键id
- c_name: string, // 课程名称
- c_tid: string, // 教师表主键id
- op: string, // 数据操作类型
- ts_ms: long // 毫秒时间戳
- ]
用 Navicat 连接 MySQL,建表及初始化数据相关 SQL 如下:
- # 创建数据库 flinkcdc_etl_test
- create database flinkcdc_etl_test;
-
- # 使用数据库 flinkcdc_etl_test
- use flinkcdc_etl_test;
-
- # 创建教师表
- -- ----------------------------
- -- Table structure for teacher
- -- ----------------------------
- DROP TABLE IF EXISTS `teacher`;
- CREATE TABLE `teacher` (
- `t_id` varchar(3) NOT NULL COMMENT '主键',
- `t_name` varchar(10) NOT NULL COMMENT '教师名称',
- PRIMARY KEY (`t_id`) USING BTREE
- )COMMENT = '教师表';
-
- -- ----------------------------
- -- Records of teacher
- -- ----------------------------
- INSERT INTO `teacher` VALUES ('001', '张三');
- INSERT INTO `teacher` VALUES ('002', '李四');
- INSERT INTO `teacher` VALUES ('003', '王五');
-
- -- ----------------------------
- -- Table structure for course
- -- ----------------------------
- DROP TABLE IF EXISTS `course`;
- CREATE TABLE `course` (
- `c_id` varchar(3) NOT NULL COMMENT '主键',
- `c_name` varchar(20) NOT NULL COMMENT '课程名称',
- `c_tid` varchar(3) NOT NULL COMMENT '教师表主键',
- PRIMARY KEY (`c_id`) USING BTREE,
- INDEX `c_tid`(`c_tid`) USING BTREE
- )COMMENT = '课程表';
-
- -- ----------------------------
- -- Records of course
- -- ----------------------------
- INSERT INTO `course` VALUES ('1', '语文', '001');
- INSERT INTO `course` VALUES ('2', '数学', '002');
操作截图如下:
在 IDEA 中创建 maven 项目,项目结构截图如下:
pom.xml 文件内容如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>cn.mfox</groupId>
- <artifactId>teacher-course-etl-demo</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <flink.version>1.14.4</flink.version>
- <scala.binary.version>2.12</scala.binary.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.23</version>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.80</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
OpEnum.java 枚举类内容如下:
- package cn.mfox.enumeration;
-
- /**
- * CDC 中op类型
- *
- * @author hy
- * @version 1.0
- * @date 2022/4/08 17:28
- */
- public enum OpEnum {
-
- /**
- * 新增
- */
- CREATE("c", "create", "新增"),
-
- /**
- * 修改
- */
- UPDATA("u", "update", "更新"),
-
- /**
- * 删除
- */
- DELETE("d", "delete", "删除"),
-
- /**
- * 读
- */
- READ("r", "read", "首次读");
-
- /**
- * 字典码
- */
- private String dictCode;
-
- /**
- * 字典码翻译值
- */
- private String dictValue;
-
- /**
- * 字典码描述
- */
- private String description;
-
- OpEnum(String dictCode, String dictValue, String description) {
- this.dictCode = dictCode;
- this.dictValue = dictValue;
- this.description = description;
- }
-
- public String getDictCode() {
- return dictCode;
- }
-
- public String getDictValue() {
- return dictValue;
- }
-
- public String getDescription() {
- return description;
- }
- }
TransformUtil.java 工具类内容如下:
- package cn.mfox.utils;
-
- import cn.mfox.enumeration.OpEnum;
- import com.alibaba.fastjson.JSONObject;
-
- /**
- * 转换工具类
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:25
- */
- public class TransformUtil {
-
- /**
- * 格式化抽取数据格式
- * 去除before、after、source等冗余内容
- *
- * @param extractData 抽取的数据
- * @return
- */
- public static JSONObject formatResult(String extractData) {
- JSONObject formatDataObj = new JSONObject();
- JSONObject rawDataObj = JSONObject.parseObject(extractData);
- formatDataObj.putAll(rawDataObj);
- formatDataObj.remove("before");
- formatDataObj.remove("after");
- formatDataObj.remove("source");
- String op = rawDataObj.getString("op");
- if (OpEnum.DELETE.getDictCode().equals(op)) {
- // 新增取 before结构体数据
- formatDataObj.putAll(rawDataObj.getJSONObject("before"));
- } else {
- // 其余取 after结构体数据
- formatDataObj.putAll(rawDataObj.getJSONObject("after"));
- }
- return formatDataObj;
- }
- }
水位线类截图如下:
CourseDataStreamNoWatermark.java 内容如下:
- package cn.mfox.etl.v2.join.watermark;
-
- import cn.mfox.utils.TransformUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * 抽取 课程表表数据源,无水位线
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:33
- */
- public class CourseDataStreamNoWatermark {
-
- /**
- * 获取 课程表 数据流
- *
- * @param env
- * @return
- */
- public static DataStream<JSONObject> getCourseDataStream(StreamExecutionEnvironment env) {
- // 1.创建Flink-MySQL-CDC的Source
- MySqlSource<String> courseSouce = MySqlSource.<String>builder()
- .hostname("192.168.18.101")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("flinkcdc_etl_test")
- .tableList("flinkcdc_etl_test.course")
- .startupOptions(StartupOptions.initial())
- .deserializer(new JsonDebeziumDeserializationSchema())
- .serverTimeZone("Asia/Shanghai")
- .build();
-
- // 2.使用CDC Source从MySQL读取数据
- DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
- courseSouce,
- WatermarkStrategy.noWatermarks(),
- "CourseDataStreamNoWatermark Source"
- );
- // 3.转换为指定格式
- DataStream<JSONObject> courseDataStream = mysqlDataStreamSource.map(rawData -> {
- return TransformUtil.formatResult(rawData);
- });
- return courseDataStream;
- }
- }
CourseDataStreamWithWatermark.java 内容如下:
- package cn.mfox.etl.v2.join.watermark;
-
- import cn.mfox.utils.TransformUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.time.Duration;
-
- /**
- * 抽取 课程表表数据源
- * 有水位线,指定ts_ms为时间戳
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:33
- */
- public class CourseDataStreamWithWatermark {
-
- /**
- * 获取 课程表 数据流
- *
- * @param env
- * @return
- */
- public static DataStream<JSONObject> getCourseDataStream(StreamExecutionEnvironment env) {
- // 1.创建Flink-MySQL-CDC的Source
- MySqlSource<String> courseSouce = MySqlSource.<String>builder()
- .hostname("192.168.18.101")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("flinkcdc_etl_test")
- .tableList("flinkcdc_etl_test.course")
- .startupOptions(StartupOptions.initial())
- .deserializer(new JsonDebeziumDeserializationSchema())
- .serverTimeZone("Asia/Shanghai")
- .build();
-
- // 2.使用CDC Source从MySQL读取数据
- DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
- courseSouce,
- WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
- new SerializableTimestampAssigner<String>() {
- @Override
- public long extractTimestamp(String extractData, long l) {
- return JSONObject.parseObject(extractData).getLong("ts_ms");
- }
- }
- ),
- "CourseDataStreamWithWatermark Source"
- );
- // 3.转换为指定格式
- DataStream<JSONObject> courseDataStream = mysqlDataStreamSource.map(rawData -> {
- return TransformUtil.formatResult(rawData);
- });
- return courseDataStream;
- }
- }
TeacherDataStreamNoWatermark.java 内容如下:
- package cn.mfox.etl.v2.join.watermark;
-
- import cn.mfox.utils.TransformUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * 抽取 教师表数据源
- * 无水位线
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:33
- */
- public class TeacherDataStreamNoWatermark {
-
- /**
- * 获取教师表数据流
- *
- * @param env
- * @return
- */
- public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
- // 1.创建Flink-MySQL-CDC的Source
- MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
- .hostname("192.168.18.101")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("flinkcdc_etl_test")
- .tableList("flinkcdc_etl_test.teacher")
- .startupOptions(StartupOptions.initial())
- .deserializer(new JsonDebeziumDeserializationSchema())
- .serverTimeZone("Asia/Shanghai")
- .build();
-
- // 2.使用CDC Source从MySQL读取数据
- DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
- teacherSouce,
- WatermarkStrategy.noWatermarks(),
- "TeacherDataStreamNoWatermark Source"
- );
- // 3.转换为指定格式
- DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
- return TransformUtil.formatResult(rawData);
- });
- return teacherDataStream;
- }
-
- public static void main(String[] args) {
- System.out.println(System.currentTimeMillis());
- }
- }
TeacherDataStreamWithWatermark.java 内容如下:
- package cn.mfox.etl.v2.join.watermark;
-
- import cn.mfox.utils.TransformUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.time.Duration;
-
- /**
- * 抽取 教师表数据源
- * 有水位线,指定ts_ms为时间戳
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:33
- */
- public class TeacherDataStreamWithWatermark {
-
- /**
- * 获取教师表数据流
- *
- * @param env
- * @return
- */
- public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
- // 1.创建Flink-MySQL-CDC的Source
- MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
- .hostname("192.168.18.101")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("flinkcdc_etl_test")
- .tableList("flinkcdc_etl_test.teacher")
- .startupOptions(StartupOptions.initial())
- .deserializer(new JsonDebeziumDeserializationSchema())
- .serverTimeZone("Asia/Shanghai")
- .build();
-
- // 2.使用CDC Source从MySQL读取数据
- DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
- teacherSouce,
- WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
- new SerializableTimestampAssigner<String>() {
- @Override
- public long extractTimestamp(String extractData, long l) {
- return JSONObject.parseObject(extractData).getLong("ts_ms");
- }
- }
- ),
- "TeacherDataStreamWithWatermark Source"
- );
- // 3.转换为指定格式
- DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
- return TransformUtil.formatResult(rawData);
- });
- return teacherDataStream;
- }
- }
WindowInnerJoinByProcessTimeTest.java 内容如下:
- package cn.mfox.etl.v2.join.window.inner;
-
- import cn.mfox.etl.v2.join.watermark.CourseDataStreamNoWatermark;
- import cn.mfox.etl.v2.join.watermark.TeacherDataStreamNoWatermark;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flink.api.common.functions.JoinFunction;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
-
- /**
- * 基于 处理时间 的 window inner join 把教师表和课程表进行联结
- * <p>
- * 只有两者数据流关联到数据,才会进行打印
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:31
- */
- public class WindowInnerJoinByProcessTimeTest {
-
- public static void main(String[] args) throws Exception {
- // 1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 2.获取教师数据流和课程表数据流
- DataStream<JSONObject> teacherDataStream = TeacherDataStreamNoWatermark.getTeacherDataStream(env);
- DataStream<JSONObject> courseDataStream = CourseDataStreamNoWatermark.getCourseDataStream(env);
- // 3.窗口联结(教师流和课程表)打印输出
- windowInnerJoinAndPrint(teacherDataStream, courseDataStream);
- // 4.执行任务
- env.execute("WindowInnerJoinByProcessTimeTest Job");
- }
-
- /**
- * 窗口联结并打印输出
- * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。
- * 如果想实现Window上的 outer join,需要使用coGroup算子
- *
- * @param teacherDataStream 教师数据流
- * @param courseDataStream 课程数据流
- */
- private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream,
- DataStream<JSONObject> courseDataStream) {
- DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
- .join(courseDataStream)
- .where(teacher -> teacher.getString("t_id"))
- .equalTo(couse -> couse.getString("c_tid"))
- .window(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
- .apply(
- new JoinFunction<JSONObject, JSONObject, JSONObject>() {
- @Override
- public JSONObject join(JSONObject jsonObject,
- JSONObject jsonObject2) {
- // 拼接
- jsonObject.putAll(jsonObject2);
- return jsonObject;
- }
- }
- );
- teacherCourseDataStream.print("Window Inner Join By Process Time");
- }
- }
程序运行截图:
控制台输出内容:
- Window Inner Join By Process Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652259235842}
- Window Inner Join By Process Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652259235843}
结论:
只输出两表数据流的关联数据。
WindowInnerJoinByProcessTimeTest.java 内容如下:
- package cn.mfox.etl.v2.join.window.outer;
-
- import cn.mfox.etl.v2.join.watermark.CourseDataStreamNoWatermark;
- import cn.mfox.etl.v2.join.watermark.TeacherDataStreamNoWatermark;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flink.api.common.functions.CoGroupFunction;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
-
- /**
- * 根据 process time(处理时间) 进行 window outer join(窗口外关联)
- * 把教师表和课程表进行窗口外联联结,关联不到的数据也会下发
- * 窗口同组联结(Window CoGroup Join)
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:31
- */
- public class WindowOuterJoinByProcessTimeTest {
-
- public static void main(String[] args) throws Exception {
- // 1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 2.获取教师数据流和课程表数据流
- DataStream<JSONObject> teacherDataStream = TeacherDataStreamNoWatermark.getTeacherDataStream(env);
- DataStream<JSONObject> courseDataStream = CourseDataStreamNoWatermark.getCourseDataStream(env);
- // 3.窗口联结(教师流和课程表)打印输出
- windowOuterJoinAndPrint(teacherDataStream, courseDataStream);
- // 4.执行任务
- env.execute("WindowOuterJoinByProcessTimeTest Job");
- }
-
- /**
- * 窗口外联并打印输出
- * Window上的 outer join,使用coGroup算子,关联不到的数据也会下发
- *
- * @param teacherDataStream 教师数据流
- * @param courseDataStream 课程数据流
- */
- private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream,
- DataStream<JSONObject> courseDataStream) {
- DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
- .coGroup(courseDataStream)
- .where(teacher -> teacher.getString("t_id"))
- .equalTo(course -> course.getString("c_tid"))
- .window(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
- .apply(
- new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
- @Override
- public void coGroup(Iterable<JSONObject> iterable,
- Iterable<JSONObject> iterable1,
- Collector<JSONObject> collector) {
- JSONObject result = new JSONObject();
- for (JSONObject jsonObject : iterable) {
- result.putAll(jsonObject);
- }
- for (JSONObject jsonObject : iterable1) {
- result.putAll(jsonObject);
- }
- collector.collect(result);
- }
- }
- );
- teacherCourseDataStream.print("Window Outer Join By Process Time");
- }
- }
程序运行截图:
控制台输出内容:
- Window Outer Join By Process Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652259799140}
- Window Outer Join By Process Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652259799140}
- Window Outer Join By Process Time> {"t_id":"003","op":"r","t_name":"王五","ts_ms":1652259799139}
结论:
输出关联到的数据。
未关联的教师表数据流也被输出(教师名称为王五的数据也被输出)。
“处理时间窗口内联结” 和 “处理时间窗口外联结” 对比:
WindowInnerJoinByEventTimeTest.java 内容如下:
- package cn.mfox.etl.v2.join.window.eventtime;
-
- import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
- import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flink.api.common.functions.JoinFunction;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
-
- /**
- * 基于 事件时间 的 window inner join 把教师表和课程表进行联结
- * <p>
- * 只有两者数据流关联到数据,才会进行打印
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:31
- */
- public class WindowInnerJoinByEventTimeTest {
-
- public static void main(String[] args) throws Exception {
- // 1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 2.获取教师数据流和课程表数据流
- DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
- DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
- // 3.窗口联结(教师流和课程表)打印输出
- windowInnerJoinAndPrint(teacherDataStream, courseDataStream);
- // 4.执行任务
- env.execute("WindowInnerJoinByEventTimeTest Job");
- }
-
- /**
- * 窗口联结并打印输出
- * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。
- * 如果想实现Window上的 outer join,需要使用coGroup算子
- *
- * @param teacherDataStream 教师数据流
- * @param courseDataStream 课程数据流
- */
- private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream,
- DataStream<JSONObject> courseDataStream) {
- DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
- .join(courseDataStream)
- .where(teacher -> teacher.getString("t_id"))
- .equalTo(couse -> couse.getString("c_tid"))
- .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
- .apply(
- new JoinFunction<JSONObject, JSONObject, JSONObject>() {
- @Override
- public JSONObject join(JSONObject jsonObject,
- JSONObject jsonObject2) {
- // 拼接
- jsonObject.putAll(jsonObject2);
- return jsonObject;
- }
- }
- );
- teacherCourseDataStream.print("Window Inner Join By Event Time");
- }
- }
WindowOuterJoinByEventTimeTest.java 内容如下:
- package cn.mfox.etl.v2.join.window.eventtime;
-
- import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
- import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flink.api.common.functions.CoGroupFunction;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
-
- /**
- * 根据 event time(事件时间) 进行 window outer join(窗口外关联)
- * 把教师表和课程表进行窗口外联联结,关联不到的数据也会下发
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:31
- */
- public class WindowOuterJoinByEventTimeTest {
-
- public static void main(String[] args) throws Exception {
- // 1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 2.获取教师数据流和课程表数据流
- DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
- DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
- // 3.窗口联结(教师流和课程表)打印输出
- windowOuterJoinAndPrint(teacherDataStream, courseDataStream);
- // 4.执行任务
- env.execute("WindowOuterJoinByEventTimeTest Job");
- }
-
- /**
- * 窗口外联并打印输出
- * Window上的 outer join,使用coGroup算子,关联不到的数据也会下发
- *
- * @param teacherDataStream 教师数据流
- * @param courseDataStream 课程数据流
- */
- private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream,
- DataStream<JSONObject> courseDataStream) {
- DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
- .coGroup(courseDataStream)
- .where(teacher -> teacher.getString("t_id"))
- .equalTo(course -> course.getString("c_tid"))
- .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
- .apply(
- new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
- @Override
- public void coGroup(Iterable<JSONObject> iterable,
- Iterable<JSONObject> iterable1,
- Collector<JSONObject> collector) {
- JSONObject result = new JSONObject();
- for (JSONObject jsonObject : iterable) {
- result.putAll(jsonObject);
- }
- for (JSONObject jsonObject : iterable1) {
- result.putAll(jsonObject);
- }
- collector.collect(result);
- }
- }
- );
- teacherCourseDataStream.print("Window Outer Join By Event Time");
- }
- }
因 “事件时间” 是跟随数据本身的时间,所以验证的前置条件:
事件时间内联结 和 事件时间外联结 程序都需要处于运行状态。
教师表和课程表需要插入数据,促进水位线移动,完成事件时间窗口计算。
具体验证步骤如下:
启动 “事件时间 内联结” 程序
启动 “事件时间 外联结” 程序
两种表中插入数据,促进水位线移动(重要)
SQL 语句如下:
- # 插入教师数据
- insert into teacher value('004','马六');
-
- # 插入课程数据
- insert into course value('3','英语','003');
插入SQL截图:
“事件时间 内联结” 控制台输出
输出截图:
控制台打印内容:
- Window Inner Join By Event Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652334256893}
- Window Inner Join By Event Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652334256894}
“事件时间 外联结” 控制台输出
输出截图:
控制台打印内容:
- Window Outer Join By Event Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652334264544}
- Window Outer Join By Event Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652334264543}
- Window Outer Join By Event Time> {"t_id":"003","op":"r","t_name":"王五","ts_ms":1652334264544}
“事件时间窗口内联结” 和 “事件时间窗口外联结” 对比:
官网说明链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/joining/#interval-join
官网说明截图:
InteralJoinByEventTimeTest.java 内容如下:
- package cn.mfox.etl.v2.join.interal;
-
- import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
- import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
-
- /**
- * interal join(间隔联结) 把教师表和课程表进行联结
- * 间隔联结只支持 事件时间,不支持 处理时间
- *
- * @author hy
- * @version 1.0
- * @date 2022/5/6 16:31
- */
- public class InteralJoinByEventTimeTest {
-
- public static void main(String[] args) throws Exception {
- // 1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 2.获取教师数据流和课程表数据流
- DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
- DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
- // 3.间隔联结(教师流和课程表)打印输出
- intervalJoinAndPrint(teacherDataStream, courseDataStream);
- // 4.执行任务
- env.execute("TeacherJoinCourseTest Job");
- }
-
- /**
- * 间隔联结并打印输出
- *
- * @param teacherDataStream 教师数据流
- * @param courseDataStream 课程数据流
- */
- private static void intervalJoinAndPrint(DataStream<JSONObject> teacherDataStream,
- DataStream<JSONObject> courseDataStream) {
- DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
- .keyBy(teacher -> teacher.getString("t_id"))
- .intervalJoin(
- courseDataStream.keyBy(course -> course.getString("c_tid"))
- )
- .between(
- Time.seconds(-5),
- Time.seconds(5)
- )
- .process(
- new ProcessJoinFunction<JSONObject, JSONObject, JSONObject>() {
- @Override
- public void processElement(JSONObject left, JSONObject right,
- Context ctx, Collector<JSONObject> out) {
- left.putAll(right);
- out.collect(left);
- }
- }
- );
- teacherCourseDataStream.print("Interval Join By Event Time");
- }
- }
程序运行截图:
控制台输出内容:
- Interval Join By Event Time> {"t_id":"003","op":"r","c_tid":"003","t_name":"王五","c_id":"3","c_name":"英语","ts_ms":1652335319793}
- Interval Join By Event Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652335319793}
- Interval Join By Event Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652335319793}
结论:
只输出关联到的数据。
只支持 “事件时间” 。
控制台启动后,无需插入数据促进水位线流动也能输出数据。
针对 “双流 JOIN” 的多种方式联结,总结对比如下:
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!
2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。