当前位置:   article > 正文

FlinkCDC Java DataStream API 的同源双流JOIN

flink datastream api实现cdc

7e3dbf8576c9d4a82dedefbea607d037.png全网最全大数据面试提升手册!

1 说明

1.1 案例说明

本文使用 Flink CDC 最新版本 2.2 及 Flink 1.14 版本通过 Java DataStream API 做 双流 Join 案例。

双流 Join 大致流程:
d32470bf04c12fae137d5a438f6b3eb3.jpeg
双流Join案例具体划分有:
时间窗口联结
  • 处理事件窗口联结

    • 处理时间窗口 内联结 案例

    • 处理时间窗口 外联结 案例

  • 事件时间窗口联结

    • 事件时间窗口 内联结 案例

    • 事件时间窗口 外联结 案例

间隔联结

案例大致流程如下:

03ed5f7baa959be82a97ed2d901500ef.jpeg
1.2 软件说明

软件版本说明如下:

63dcc07e20c2fed8c060f64a9c7cf862.png

注意事项:

  • MySQL 服务器必须开启 binlog 功能

  • Flink CDC 官网:https://github.com/ververica/flink-cdc-connectors

  • Flink 官网:https://flink.apache.org/

1.3 数据表字段说明

教师表(teacher)字段说明:

  1. [
  2.     t_id varchar(3) primary key COMMENT '主键',
  3.     t_name varchar(10) not null COMMENT '主键',
  4. ]

课程表(course)字段说明:

  1. [
  2.     c_id varchar(3) primary key COMMENT '主键',
  3.     c_name varchar(20) not null COMMENT '主键',
  4.     c_tid varchar(3) not null COMMENT '主键'
  5. ]
1.4 数据流字段说明

教师表(teacher)数据流字段说明:

  1. [
  2.   t_id:  string,   // 主键id
  3.   t_name: string// 教师名称
  4.   op: string// 数据操作类型
  5.   ts_ms: long // 毫秒时间戳
  6. ]

课程表(course)数据流字段说明:

  1. [
  2.   c_id:  string,   // 主键id
  3.   c_name: string// 课程名称
  4.   c_tid: string,  // 教师表主键id
  5.   op: string// 数据操作类型
  6.   ts_ms: long // 毫秒时间戳
  7. ]

2 准备工作

2.1 MySQL 数据准备

用 Navicat 连接 MySQL,建表及初始化数据相关 SQL 如下:

  1. # 创建数据库 flinkcdc_etl_test
  2. create database flinkcdc_etl_test;
  3. # 使用数据库 flinkcdc_etl_test
  4. use flinkcdc_etl_test;
  5. # 创建教师表
  6. -- ----------------------------
  7. -- Table structure for teacher
  8. -- ----------------------------
  9. DROP TABLE IF EXISTS `teacher`;
  10. CREATE TABLE `teacher`  (
  11.   `t_id` varchar(3) NOT NULL COMMENT '主键',
  12.   `t_name` varchar(10)  NOT NULL COMMENT '教师名称',
  13.   PRIMARY KEY (`t_id`) USING BTREE
  14. )COMMENT = '教师表';
  15. -- ----------------------------
  16. -- Records of teacher
  17. -- ----------------------------
  18. INSERT INTO `teacher` VALUES ('001''张三');
  19. INSERT INTO `teacher` VALUES ('002''李四');
  20. INSERT INTO `teacher` VALUES ('003''王五');
  21. -- ----------------------------
  22. -- Table structure for course
  23. -- ----------------------------
  24. DROP TABLE IF EXISTS `course`;
  25. CREATE TABLE `course`  (
  26.   `c_id` varchar(3)  NOT NULL COMMENT '主键',
  27.   `c_name` varchar(20)  NOT NULL COMMENT '课程名称',
  28.   `c_tid` varchar(3)  NOT NULL COMMENT '教师表主键',
  29.   PRIMARY KEY (`c_id`) USING BTREE,
  30.   INDEX `c_tid`(`c_tid`) USING BTREE
  31. )COMMENT = '课程表';
  32. -- ----------------------------
  33. -- Records of course
  34. -- ----------------------------
  35. INSERT INTO `course` VALUES ('1''语文''001');
  36. INSERT INTO `course` VALUES ('2''数学''002');

操作截图如下:

350abdcf702a5018f1bb244eb96027df.jpeg
2.2 项目工程
2.2.1 项目结构

在 IDEA 中创建 maven 项目,项目结构截图如下:

9a2372844037a390a4c3f42921976db6.jpeg
2.2.2 项目依赖

pom.xml 文件内容如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>cn.mfox</groupId>
  7.     <artifactId>teacher-course-etl-demo</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11.         <maven.compiler.source>1.8</maven.compiler.source>
  12.         <maven.compiler.target>1.8</maven.compiler.target>
  13.         <flink.version>1.14.4</flink.version>
  14.         <scala.binary.version>2.12</scala.binary.version>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>org.apache.flink</groupId>
  19.             <artifactId>flink-streaming-java_2.12</artifactId>
  20.             <version>${flink.version}</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.apache.flink</groupId>
  24.             <artifactId>flink-java</artifactId>
  25.             <version>${flink.version}</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.flink</groupId>
  29.             <artifactId>flink-clients_${scala.binary.version}</artifactId>
  30.             <version>${flink.version}</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>mysql</groupId>
  34.             <artifactId>mysql-connector-java</artifactId>
  35.             <version>8.0.23</version>
  36.         </dependency>
  37.         <dependency>
  38.             <groupId>com.ververica</groupId>
  39.             <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  40.             <version>2.2.0</version>
  41.         </dependency>
  42.         <dependency>
  43.             <groupId>org.apache.flink</groupId>
  44.             <artifactId>flink-table-planner-blink_2.12</artifactId>
  45.             <version>1.12.0</version>
  46.         </dependency>
  47.         <dependency>
  48.             <groupId>com.alibaba</groupId>
  49.             <artifactId>fastjson</artifactId>
  50.             <version>1.2.80</version>
  51.         </dependency>
  52.     </dependencies>
  53.     <build>
  54.         <plugins>
  55.             <plugin>
  56.                 <artifactId>maven-compiler-plugin</artifactId>
  57.                 <version>3.6.0</version>
  58.                 <configuration>
  59.                     <source>1.8</source>
  60.                     <target>1.8</target>
  61.                 </configuration>
  62.             </plugin>
  63.         </plugins>
  64.     </build>
  65. </project>
2.2.3 基础类

OpEnum.java 枚举类内容如下:

  1. package cn.mfox.enumeration;
  2. /**
  3.  * CDC 中op类型
  4.  *
  5.  * @author hy
  6.  * @version 1.0
  7.  * @date 2022/4/08 17:28
  8.  */
  9. public enum OpEnum {
  10.     /**
  11.      * 新增
  12.      */
  13.     CREATE("c""create""新增"),
  14.     /**
  15.      * 修改
  16.      */
  17.     UPDATA("u""update""更新"),
  18.     /**
  19.      * 删除
  20.      */
  21.     DELETE("d""delete""删除"),
  22.     /**
  23.      * 读
  24.      */
  25.     READ("r""read""首次读");
  26.     /**
  27.      * 字典码
  28.      */
  29.     private String dictCode;
  30.     /**
  31.      * 字典码翻译值
  32.      */
  33.     private String dictValue;
  34.     /**
  35.      * 字典码描述
  36.      */
  37.     private String description;
  38.     OpEnum(String dictCode, String dictValue, String description) {
  39.         this.dictCode = dictCode;
  40.         this.dictValue = dictValue;
  41.         this.description = description;
  42.     }
  43.     public String getDictCode() {
  44.         return dictCode;
  45.     }
  46.     public String getDictValue() {
  47.         return dictValue;
  48.     }
  49.     public String getDescription() {
  50.         return description;
  51.     }
  52. }

TransformUtil.java 工具类内容如下:

  1. package cn.mfox.utils;
  2. import cn.mfox.enumeration.OpEnum;
  3. import com.alibaba.fastjson.JSONObject;
  4. /**
  5.  * 转换工具类
  6.  *
  7.  * @author hy
  8.  * @version 1.0
  9.  * @date 2022/5/6 16:25
  10.  */
  11. public class TransformUtil {
  12.     /**
  13.      * 格式化抽取数据格式
  14.      * 去除before、after、source等冗余内容
  15.      *
  16.      * @param extractData 抽取的数据
  17.      * @return
  18.      */
  19.     public static JSONObject formatResult(String extractData) {
  20.         JSONObject formatDataObj = new JSONObject();
  21.         JSONObject rawDataObj = JSONObject.parseObject(extractData);
  22.         formatDataObj.putAll(rawDataObj);
  23.         formatDataObj.remove("before");
  24.         formatDataObj.remove("after");
  25.         formatDataObj.remove("source");
  26.         String op = rawDataObj.getString("op");
  27.         if (OpEnum.DELETE.getDictCode().equals(op)) {
  28.             // 新增取 before结构体数据
  29.             formatDataObj.putAll(rawDataObj.getJSONObject("before"));
  30.         } else {
  31.             // 其余取 after结构体数据
  32.             formatDataObj.putAll(rawDataObj.getJSONObject("after"));
  33.         }
  34.         return formatDataObj;
  35.     }
  36. }
2.2.4 水位线类

水位线类截图如下:

29e7245a28f066dc76061dc7281480d0.jpeg

CourseDataStreamNoWatermark.java 内容如下:

  1. package cn.mfox.etl.v2.join.watermark;
  2. import cn.mfox.utils.TransformUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  5. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  6. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  7. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. /**
  12.  * 抽取 课程表表数据源,无水位线
  13.  *
  14.  * @author hy
  15.  * @version 1.0
  16.  * @date 2022/5/6 16:33
  17.  */
  18. public class CourseDataStreamNoWatermark {
  19.     /**
  20.      * 获取 课程表 数据流
  21.      *
  22.      * @param env
  23.      * @return
  24.      */
  25.     public static DataStream<JSONObject> getCourseDataStream(StreamExecutionEnvironment env) {
  26.         // 1.创建Flink-MySQL-CDC的Source
  27.         MySqlSource<String> courseSouce = MySqlSource.<String>builder()
  28.                 .hostname("192.168.18.101")
  29.                 .port(3306)
  30.                 .username("root")
  31.                 .password("123456")
  32.                 .databaseList("flinkcdc_etl_test")
  33.                 .tableList("flinkcdc_etl_test.course")
  34.                 .startupOptions(StartupOptions.initial())
  35.                 .deserializer(new JsonDebeziumDeserializationSchema())
  36.                 .serverTimeZone("Asia/Shanghai")
  37.                 .build();
  38.         // 2.使用CDC Source从MySQL读取数据
  39.         DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
  40.                 courseSouce,
  41.                 WatermarkStrategy.noWatermarks(),
  42.                 "CourseDataStreamNoWatermark Source"
  43.         );
  44.         // 3.转换为指定格式
  45.         DataStream<JSONObject> courseDataStream = mysqlDataStreamSource.map(rawData -> {
  46.             return TransformUtil.formatResult(rawData);
  47.         });
  48.         return courseDataStream;
  49.     }
  50. }

CourseDataStreamWithWatermark.java 内容如下:

  1. package cn.mfox.etl.v2.join.watermark;
  2. import cn.mfox.utils.TransformUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  5. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  6. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  7. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import java.time.Duration;
  13. /**
  14.  * 抽取 课程表表数据源
  15.  * 有水位线,指定ts_ms为时间戳
  16.  *
  17.  * @author hy
  18.  * @version 1.0
  19.  * @date 2022/5/6 16:33
  20.  */
  21. public class CourseDataStreamWithWatermark {
  22.     /**
  23.      * 获取 课程表 数据流
  24.      *
  25.      * @param env
  26.      * @return
  27.      */
  28.     public static DataStream<JSONObject> getCourseDataStream(StreamExecutionEnvironment env) {
  29.         // 1.创建Flink-MySQL-CDC的Source
  30.         MySqlSource<String> courseSouce = MySqlSource.<String>builder()
  31.                 .hostname("192.168.18.101")
  32.                 .port(3306)
  33.                 .username("root")
  34.                 .password("123456")
  35.                 .databaseList("flinkcdc_etl_test")
  36.                 .tableList("flinkcdc_etl_test.course")
  37.                 .startupOptions(StartupOptions.initial())
  38.                 .deserializer(new JsonDebeziumDeserializationSchema())
  39.                 .serverTimeZone("Asia/Shanghai")
  40.                 .build();
  41.         // 2.使用CDC Source从MySQL读取数据
  42.         DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
  43.                 courseSouce,
  44.                 WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
  45.                         new SerializableTimestampAssigner<String>() {
  46.                             @Override
  47.                             public long extractTimestamp(String extractData, long l) {
  48.                                 return JSONObject.parseObject(extractData).getLong("ts_ms");
  49.                             }
  50.                         }
  51.                 ),
  52.                 "CourseDataStreamWithWatermark Source"
  53.         );
  54.         // 3.转换为指定格式
  55.         DataStream<JSONObject> courseDataStream = mysqlDataStreamSource.map(rawData -> {
  56.             return TransformUtil.formatResult(rawData);
  57.         });
  58.         return courseDataStream;
  59.     }
  60. }

TeacherDataStreamNoWatermark.java 内容如下:

  1. package cn.mfox.etl.v2.join.watermark;
  2. import cn.mfox.utils.TransformUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  5. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  6. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  7. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. /**
  12.  * 抽取 教师表数据源
  13.  * 无水位线
  14.  *
  15.  * @author hy
  16.  * @version 1.0
  17.  * @date 2022/5/6 16:33
  18.  */
  19. public class TeacherDataStreamNoWatermark {
  20.     /**
  21.      * 获取教师表数据流
  22.      *
  23.      * @param env
  24.      * @return
  25.      */
  26.     public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
  27.         // 1.创建Flink-MySQL-CDC的Source
  28.         MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
  29.                 .hostname("192.168.18.101")
  30.                 .port(3306)
  31.                 .username("root")
  32.                 .password("123456")
  33.                 .databaseList("flinkcdc_etl_test")
  34.                 .tableList("flinkcdc_etl_test.teacher")
  35.                 .startupOptions(StartupOptions.initial())
  36.                 .deserializer(new JsonDebeziumDeserializationSchema())
  37.                 .serverTimeZone("Asia/Shanghai")
  38.                 .build();
  39.         // 2.使用CDC Source从MySQL读取数据
  40.         DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
  41.                 teacherSouce,
  42.                 WatermarkStrategy.noWatermarks(),
  43.                 "TeacherDataStreamNoWatermark Source"
  44.         );
  45.         // 3.转换为指定格式
  46.         DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
  47.             return TransformUtil.formatResult(rawData);
  48.         });
  49.         return teacherDataStream;
  50.     }
  51.     public static void main(String[] args) {
  52.         System.out.println(System.currentTimeMillis());
  53.     }
  54. }

TeacherDataStreamWithWatermark.java 内容如下:

  1. package cn.mfox.etl.v2.join.watermark;
  2. import cn.mfox.utils.TransformUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  5. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  6. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  7. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import java.time.Duration;
  13. /**
  14.  * 抽取 教师表数据源
  15.  * 有水位线,指定ts_ms为时间戳
  16.  *
  17.  * @author hy
  18.  * @version 1.0
  19.  * @date 2022/5/6 16:33
  20.  */
  21. public class TeacherDataStreamWithWatermark {
  22.     /**
  23.      * 获取教师表数据流
  24.      *
  25.      * @param env
  26.      * @return
  27.      */
  28.     public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
  29.         // 1.创建Flink-MySQL-CDC的Source
  30.         MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
  31.                 .hostname("192.168.18.101")
  32.                 .port(3306)
  33.                 .username("root")
  34.                 .password("123456")
  35.                 .databaseList("flinkcdc_etl_test")
  36.                 .tableList("flinkcdc_etl_test.teacher")
  37.                 .startupOptions(StartupOptions.initial())
  38.                 .deserializer(new JsonDebeziumDeserializationSchema())
  39.                 .serverTimeZone("Asia/Shanghai")
  40.                 .build();
  41.         // 2.使用CDC Source从MySQL读取数据
  42.         DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
  43.                 teacherSouce,
  44.                 WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
  45.                         new SerializableTimestampAssigner<String>() {
  46.                             @Override
  47.                             public long extractTimestamp(String extractData, long l) {
  48.                                 return JSONObject.parseObject(extractData).getLong("ts_ms");
  49.                             }
  50.                         }
  51.                 ),
  52.                 "TeacherDataStreamWithWatermark Source"
  53.         );
  54.         // 3.转换为指定格式
  55.         DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
  56.             return TransformUtil.formatResult(rawData);
  57.         });
  58.         return teacherDataStream;
  59.     }
  60. }

3 时间窗口联结

3.1 处理时间窗口
3.1.1 处理时间窗口内联结

WindowInnerJoinByProcessTimeTest.java 内容如下:

  1. package cn.mfox.etl.v2.join.window.inner;
  2. import cn.mfox.etl.v2.join.watermark.CourseDataStreamNoWatermark;
  3. import cn.mfox.etl.v2.join.watermark.TeacherDataStreamNoWatermark;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.flink.api.common.functions.JoinFunction;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. /**
  11.  * 基于 处理时间 的 window inner join 把教师表和课程表进行联结
  12.  * <p>
  13.  * 只有两者数据流关联到数据,才会进行打印
  14.  *
  15.  * @author hy
  16.  * @version 1.0
  17.  * @date 2022/5/6 16:31
  18.  */
  19. public class WindowInnerJoinByProcessTimeTest {
  20.     public static void main(String[] args) throws Exception {
  21.         // 1.创建执行环境
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         env.setParallelism(1);
  24.         // 2.获取教师数据流和课程表数据流
  25.         DataStream<JSONObject> teacherDataStream = TeacherDataStreamNoWatermark.getTeacherDataStream(env);
  26.         DataStream<JSONObject> courseDataStream = CourseDataStreamNoWatermark.getCourseDataStream(env);
  27.         // 3.窗口联结(教师流和课程表)打印输出
  28.         windowInnerJoinAndPrint(teacherDataStream, courseDataStream);
  29.         // 4.执行任务
  30.         env.execute("WindowInnerJoinByProcessTimeTest Job");
  31.     }
  32.     /**
  33.      * 窗口联结并打印输出
  34.      * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。
  35.      * 如果想实现Window上的 outer join,需要使用coGroup算子
  36.      *
  37.      * @param teacherDataStream 教师数据流
  38.      * @param courseDataStream  课程数据流
  39.      */
  40.     private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream,
  41.                                                 DataStream<JSONObject> courseDataStream) {
  42.         DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
  43.                 .join(courseDataStream)
  44.                 .where(teacher -> teacher.getString("t_id"))
  45.                 .equalTo(couse -> couse.getString("c_tid"))
  46.                 .window(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
  47.                 .apply(
  48.                         new JoinFunction<JSONObject, JSONObject, JSONObject>() {
  49.                             @Override
  50.                             public JSONObject join(JSONObject jsonObject,
  51.                                                    JSONObject jsonObject2) {
  52.                                 // 拼接
  53.                                 jsonObject.putAll(jsonObject2);
  54.                                 return jsonObject;
  55.                             }
  56.                         }
  57.                 );
  58.         teacherCourseDataStream.print("Window Inner Join By Process Time");
  59.     }
  60. }

程序运行截图:

3c8bdd570af5d39ccec3f3af148c500c.jpeg

控制台输出内容:

  1. Window Inner Join By Process Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652259235842}
  2. Window Inner Join By Process Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652259235843}

结论:

  • 只输出两表数据流的关联数据。

3.1.2 处理时间窗口外联结

WindowInnerJoinByProcessTimeTest.java 内容如下:

  1. package cn.mfox.etl.v2.join.window.outer;
  2. import cn.mfox.etl.v2.join.watermark.CourseDataStreamNoWatermark;
  3. import cn.mfox.etl.v2.join.watermark.TeacherDataStreamNoWatermark;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.flink.api.common.functions.CoGroupFunction;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. import org.apache.flink.util.Collector;
  11. /**
  12.  * 根据 process time(处理时间) 进行 window outer join(窗口外关联)
  13.  * 把教师表和课程表进行窗口外联联结,关联不到的数据也会下发
  14.  * 窗口同组联结(Window CoGroup Join)
  15.  *
  16.  * @author hy
  17.  * @version 1.0
  18.  * @date 2022/5/6 16:31
  19.  */
  20. public class WindowOuterJoinByProcessTimeTest {
  21.     public static void main(String[] args) throws Exception {
  22.         // 1.创建执行环境
  23.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24.         env.setParallelism(1);
  25.         // 2.获取教师数据流和课程表数据流
  26.         DataStream<JSONObject> teacherDataStream = TeacherDataStreamNoWatermark.getTeacherDataStream(env);
  27.         DataStream<JSONObject> courseDataStream = CourseDataStreamNoWatermark.getCourseDataStream(env);
  28.         // 3.窗口联结(教师流和课程表)打印输出
  29.         windowOuterJoinAndPrint(teacherDataStream, courseDataStream);
  30.         // 4.执行任务
  31.         env.execute("WindowOuterJoinByProcessTimeTest Job");
  32.     }
  33.     /**
  34.      * 窗口外联并打印输出
  35.      * Window上的 outer join,使用coGroup算子,关联不到的数据也会下发
  36.      *
  37.      * @param teacherDataStream 教师数据流
  38.      * @param courseDataStream  课程数据流
  39.      */
  40.     private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream,
  41.                                                 DataStream<JSONObject> courseDataStream) {
  42.         DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
  43.                 .coGroup(courseDataStream)
  44.                 .where(teacher -> teacher.getString("t_id"))
  45.                 .equalTo(course -> course.getString("c_tid"))
  46.                 .window(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
  47.                 .apply(
  48.                         new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
  49.                             @Override
  50.                             public void coGroup(Iterable<JSONObject> iterable,
  51.                                                 Iterable<JSONObject> iterable1,
  52.                                                 Collector<JSONObject> collector) {
  53.                                 JSONObject result = new JSONObject();
  54.                                 for (JSONObject jsonObject : iterable) {
  55.                                     result.putAll(jsonObject);
  56.                                 }
  57.                                 for (JSONObject jsonObject : iterable1) {
  58.                                     result.putAll(jsonObject);
  59.                                 }
  60.                                 collector.collect(result);
  61.                             }
  62.                         }
  63.                 );
  64.         teacherCourseDataStream.print("Window Outer Join By Process Time");
  65.     }
  66. }

程序运行截图:

ccfda20862fd4ed5caea11b243bb6ad2.jpeg

控制台输出内容:

  1. Window Outer Join By Process Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652259799140}
  2. Window Outer Join By Process Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652259799140}
  3. Window Outer Join By Process Time> {"t_id":"003","op":"r","t_name":"王五","ts_ms":1652259799139}

结论:

  • 输出关联到的数据。

  • 未关联的教师表数据流也被输出(教师名称为王五的数据也被输出)。

3.1.3 处理时间窗口内外联结对比

“处理时间窗口内联结” 和 “处理时间窗口外联结” 对比:

1a65bb493a7bd732cb6e9541f4975662.jpeg
3.2 事件时间窗口
3.2.1 事件时间窗口内联结

WindowInnerJoinByEventTimeTest.java 内容如下:

  1. package cn.mfox.etl.v2.join.window.eventtime;
  2. import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
  3. import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.flink.api.common.functions.JoinFunction;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. /**
  11.  * 基于 事件时间 的 window inner join 把教师表和课程表进行联结
  12.  * <p>
  13.  * 只有两者数据流关联到数据,才会进行打印
  14.  *
  15.  * @author hy
  16.  * @version 1.0
  17.  * @date 2022/5/6 16:31
  18.  */
  19. public class WindowInnerJoinByEventTimeTest {
  20.     public static void main(String[] args) throws Exception {
  21.         // 1.创建执行环境
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         env.setParallelism(1);
  24.         // 2.获取教师数据流和课程表数据流
  25.         DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
  26.         DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
  27.         // 3.窗口联结(教师流和课程表)打印输出
  28.         windowInnerJoinAndPrint(teacherDataStream, courseDataStream);
  29.         // 4.执行任务
  30.         env.execute("WindowInnerJoinByEventTimeTest Job");
  31.     }
  32.     /**
  33.      * 窗口联结并打印输出
  34.      * 只支持 inner join,即窗口内联关联到的才会下发,关联不到的则直接丢掉。
  35.      * 如果想实现Window上的 outer join,需要使用coGroup算子
  36.      *
  37.      * @param teacherDataStream 教师数据流
  38.      * @param courseDataStream  课程数据流
  39.      */
  40.     private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream,
  41.                                                 DataStream<JSONObject> courseDataStream) {
  42.         DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
  43.                 .join(courseDataStream)
  44.                 .where(teacher -> teacher.getString("t_id"))
  45.                 .equalTo(couse -> couse.getString("c_tid"))
  46.                 .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
  47.                 .apply(
  48.                         new JoinFunction<JSONObject, JSONObject, JSONObject>() {
  49.                             @Override
  50.                             public JSONObject join(JSONObject jsonObject,
  51.                                                    JSONObject jsonObject2) {
  52.                                 // 拼接
  53.                                 jsonObject.putAll(jsonObject2);
  54.                                 return jsonObject;
  55.                             }
  56.                         }
  57.                 );
  58.         teacherCourseDataStream.print("Window Inner Join By Event Time");
  59.     }
  60. }
3.2.2 事件时间窗口外联结

WindowOuterJoinByEventTimeTest.java 内容如下:

  1. package cn.mfox.etl.v2.join.window.eventtime;
  2. import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
  3. import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.flink.api.common.functions.CoGroupFunction;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. import org.apache.flink.util.Collector;
  11. /**
  12.  * 根据 event time(事件时间) 进行 window outer join(窗口外关联)
  13.  * 把教师表和课程表进行窗口外联联结,关联不到的数据也会下发
  14.  *
  15.  * @author hy
  16.  * @version 1.0
  17.  * @date 2022/5/6 16:31
  18.  */
  19. public class WindowOuterJoinByEventTimeTest {
  20.     public static void main(String[] args) throws Exception {
  21.         // 1.创建执行环境
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         env.setParallelism(1);
  24.         // 2.获取教师数据流和课程表数据流
  25.         DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
  26.         DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
  27.         // 3.窗口联结(教师流和课程表)打印输出
  28.         windowOuterJoinAndPrint(teacherDataStream, courseDataStream);
  29.         // 4.执行任务
  30.         env.execute("WindowOuterJoinByEventTimeTest Job");
  31.     }
  32.     /**
  33.      * 窗口外联并打印输出
  34.      * Window上的 outer join,使用coGroup算子,关联不到的数据也会下发
  35.      *
  36.      * @param teacherDataStream 教师数据流
  37.      * @param courseDataStream  课程数据流
  38.      */
  39.     private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream,
  40.                                                 DataStream<JSONObject> courseDataStream) {
  41.         DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
  42.                 .coGroup(courseDataStream)
  43.                 .where(teacher -> teacher.getString("t_id"))
  44.                 .equalTo(course -> course.getString("c_tid"))
  45.                 .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
  46.                 .apply(
  47.                         new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
  48.                             @Override
  49.                             public void coGroup(Iterable<JSONObject> iterable,
  50.                                                 Iterable<JSONObject> iterable1,
  51.                                                 Collector<JSONObject> collector) {
  52.                                 JSONObject result = new JSONObject();
  53.                                 for (JSONObject jsonObject : iterable) {
  54.                                     result.putAll(jsonObject);
  55.                                 }
  56.                                 for (JSONObject jsonObject : iterable1) {
  57.                                     result.putAll(jsonObject);
  58.                                 }
  59.                                 collector.collect(result);
  60.                             }
  61.                         }
  62.                 );
  63.         teacherCourseDataStream.print("Window Outer Join By Event Time");
  64.     }
  65. }
3.2.3 验证案例

因 “事件时间” 是跟随数据本身的时间,所以验证的前置条件:

  • 事件时间内联结 和 事件时间外联结 程序都需要处于运行状态。

  • 教师表和课程表需要插入数据,促进水位线移动,完成事件时间窗口计算。

具体验证步骤如下:

  1. 启动 “事件时间 内联结” 程序

e970661a976d90e7774d98f9d3ae932b.jpeg
  1. 启动 “事件时间 外联结” 程序

d86a0d3e58708b407adb569a23098fbf.jpeg
  1. 两种表中插入数据,促进水位线移动(重要)

SQL 语句如下:

  1. # 插入教师数据
  2. insert into teacher value('004','马六');
  3. # 插入课程数据
  4. insert into course value('3','英语','003');

插入SQL截图:

aa919088ab22b4490fab356ac02fe572.jpeg
  1. “事件时间 内联结” 控制台输出

输出截图:

4226f257b64fe26c9d5740aa389525a7.jpeg

控制台打印内容:

  1. Window Inner Join By Event Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652334256893}
  2. Window Inner Join By Event Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652334256894}
  1. “事件时间 外联结” 控制台输出

输出截图:

935d14dedb4fa359a8a95f5a54891166.jpeg

控制台打印内容:

  1. Window Outer Join By Event Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652334264544}
  2. Window Outer Join By Event Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652334264543}
  3. Window Outer Join By Event Time> {"t_id":"003","op":"r","t_name":"王五","ts_ms":1652334264544}
  1. “事件时间窗口内联结” 和 “事件时间窗口外联结” 对比:

109077cd8febad2f2345e45e9b83025c.jpeg

4 间隔联结

4.1 概念说明

官网说明链接:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/joining/#interval-join

官网说明截图:

a99d75f3d86380742b4e99804b001184.jpeg
4.2 间隔联结案例

InteralJoinByEventTimeTest.java 内容如下:

  1. package cn.mfox.etl.v2.join.interal;
  2. import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
  3. import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.util.Collector;
  10. /**
  11.  * interal join(间隔联结) 把教师表和课程表进行联结
  12.  * 间隔联结只支持 事件时间,不支持 处理时间
  13.  *
  14.  * @author hy
  15.  * @version 1.0
  16.  * @date 2022/5/6 16:31
  17.  */
  18. public class InteralJoinByEventTimeTest {
  19.     public static void main(String[] args) throws Exception {
  20.         // 1.创建执行环境
  21.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22.         env.setParallelism(1);
  23.         // 2.获取教师数据流和课程表数据流
  24.         DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
  25.         DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
  26.         // 3.间隔联结(教师流和课程表)打印输出
  27.         intervalJoinAndPrint(teacherDataStream, courseDataStream);
  28.         // 4.执行任务
  29.         env.execute("TeacherJoinCourseTest Job");
  30.     }
  31.     /**
  32.      * 间隔联结并打印输出
  33.      *
  34.      * @param teacherDataStream 教师数据流
  35.      * @param courseDataStream  课程数据流
  36.      */
  37.     private static void intervalJoinAndPrint(DataStream<JSONObject> teacherDataStream,
  38.                                              DataStream<JSONObject> courseDataStream) {
  39.         DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
  40.                 .keyBy(teacher -> teacher.getString("t_id"))
  41.                 .intervalJoin(
  42.                         courseDataStream.keyBy(course -> course.getString("c_tid"))
  43.                 )
  44.                 .between(
  45.                         Time.seconds(-5),
  46.                         Time.seconds(5)
  47.                 )
  48.                 .process(
  49.                         new ProcessJoinFunction<JSONObject, JSONObject, JSONObject>() {
  50.                             @Override
  51.                             public void processElement(JSONObject left, JSONObject right,
  52.                                                        Context ctx, Collector<JSONObject> out) {
  53.                                 left.putAll(right);
  54.                                 out.collect(left);
  55.                             }
  56.                         }
  57.                 );
  58.         teacherCourseDataStream.print("Interval Join By Event Time");
  59.     }
  60. }

程序运行截图:

73feda3db6d35e8f4afcce55b74986de.jpeg

控制台输出内容:

  1. Interval Join By Event Time> {"t_id":"003","op":"r","c_tid":"003","t_name":"王五","c_id":"3","c_name":"英语","ts_ms":1652335319793}
  2. Interval Join By Event Time> {"t_id":"002","op":"r","c_tid":"002","t_name":"李四","c_id":"2","c_name":"数学","ts_ms":1652335319793}
  3. Interval Join By Event Time> {"t_id":"001","op":"r","c_tid":"001","t_name":"张三","c_id":"1","c_name":"语文","ts_ms":1652335319793}

结论:

  • 只输出关联到的数据。

  • 只支持 “事件时间” 。

  • 控制台启动后,无需插入数据促进水位线流动也能输出数据。

5 总结

针对 “双流 JOIN” 的多种方式联结,总结对比如下:

257b9a3e6076c8e40268600a2ea5edd5.jpeg

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

17a293e3e1bdf45947bcd76dd426c1dc.png

6bf7b65357289b799ce7b2d0e27ebf50.jpeg

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/782823
推荐阅读
相关标签
  

闽ICP备14008679号